Add persistence
This commit is contained in:
parent
72a454df58
commit
63c9738b84
5 changed files with 112 additions and 12 deletions
110
core/database.go
110
core/database.go
|
@ -1,10 +1,19 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
||||
"git.exozy.me/exozyme/status/scanner"
|
||||
)
|
||||
|
||||
|
@ -22,26 +31,70 @@ const (
|
|||
EventSelfDown
|
||||
)
|
||||
|
||||
type SelfStatusUpdate EventType // I want real enum...
|
||||
// either EventSelfUp or EventSelfDown
|
||||
// I want real enum...
|
||||
type SelfStatusUpdate EventType
|
||||
|
||||
type Database interface {
|
||||
// dump data to disk
|
||||
Persist() error
|
||||
// get all records after some time (`cutoff`)
|
||||
QueryAllAfter(url string, cutoff time.Time) ([]Row, error)
|
||||
// store records with timestamp
|
||||
// store service status records with timestamp
|
||||
StoreServiceStatusBatch(entries scanner.ServiceStatusBatch, when time.Time) error
|
||||
// store self up/down event with timestamp
|
||||
StoreSelfStatus(status SelfStatusUpdate, when time.Time) error
|
||||
}
|
||||
|
||||
type dumbDatabase struct {
|
||||
rows []Row
|
||||
rootDir string
|
||||
rows []Row
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func OpenDatabase(path string) (Database, error) {
|
||||
// use this code in a real database
|
||||
// if path == "" {
|
||||
// return nil, fmt.Errorf("database path is empty. did you set it in config file?")
|
||||
// }
|
||||
return &dumbDatabase{}, nil
|
||||
func OpenDatabase(dir string) (Database, error) {
|
||||
if dir == "" {
|
||||
return nil, fmt.Errorf("database path is empty. did you set it in config file?")
|
||||
}
|
||||
err := os.MkdirAll(dir, 0755)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
files, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// sort the file name desc. Since the file names are ULID, newer files are sorted first
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].Name() > files[j].Name()
|
||||
})
|
||||
for _, file := range files {
|
||||
filename := file.Name()
|
||||
if strings.HasSuffix(filename, ".tmp") {
|
||||
continue
|
||||
}
|
||||
_, err := ulid.Parse(filename)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fullpath := path.Join(dir, filename)
|
||||
content, err := os.ReadFile(fullpath)
|
||||
if err != nil {
|
||||
log.Printf("[WARN] db file cannot be read: %v", err)
|
||||
continue
|
||||
}
|
||||
var rows []Row
|
||||
err = json.NewDecoder(bytes.NewReader(content)).Decode(&rows)
|
||||
if err != nil {
|
||||
log.Printf("[WARN] db file content cannot be parsed: %v", err)
|
||||
continue
|
||||
}
|
||||
log.Printf("Loaded %d rows from: %v", len(rows), fullpath)
|
||||
|
||||
return &dumbDatabase{rootDir: dir, rows: rows}, nil
|
||||
}
|
||||
|
||||
return &dumbDatabase{rootDir: dir, rows: []Row{}}, nil
|
||||
}
|
||||
|
||||
func (db *dumbDatabase) sortRows() {
|
||||
|
@ -50,7 +103,40 @@ func (db *dumbDatabase) sortRows() {
|
|||
})
|
||||
}
|
||||
|
||||
func (db *dumbDatabase) Persist() error {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
db.sortRows()
|
||||
id := ulid.Make().String()
|
||||
|
||||
filename_final := path.Join(db.rootDir, id)
|
||||
filename_tmp := path.Join(db.rootDir, id+".tmp")
|
||||
file, err := os.Create(filename_tmp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
err = json.NewEncoder(file).Encode(db.rows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = file.Sync()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.Rename(filename_tmp, filename_final)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Saved %d rows to: %v", len(db.rows), filename_final)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *dumbDatabase) QueryAllAfter(url string, cutoff time.Time) ([]Row, error) {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
db.sortRows()
|
||||
|
||||
var start int = -1
|
||||
|
@ -68,6 +154,9 @@ func (db *dumbDatabase) QueryAllAfter(url string, cutoff time.Time) ([]Row, erro
|
|||
}
|
||||
|
||||
func (db *dumbDatabase) StoreServiceStatusBatch(entries scanner.ServiceStatusBatch, when time.Time) error {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
for _, v := range entries {
|
||||
new_row := Row{when, EventService, v}
|
||||
|
||||
|
@ -95,6 +184,9 @@ func (db *dumbDatabase) StoreServiceStatusBatch(entries scanner.ServiceStatusBat
|
|||
const max_rows = 10000
|
||||
|
||||
func (db *dumbDatabase) StoreSelfStatus(status SelfStatusUpdate, when time.Time) error {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
if status != EventSelfUp && status != EventSelfDown {
|
||||
log.Panicf("(StoreSelfStatus) invalid value: %v", status)
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -4,6 +4,7 @@ go 1.21.1
|
|||
|
||||
require (
|
||||
github.com/cbroglie/mustache v1.4.0
|
||||
github.com/oklog/ulid/v2 v2.1.0
|
||||
github.com/pelletier/go-toml/v2 v2.1.0
|
||||
github.com/prometheus-community/pro-bing v0.3.0
|
||||
maunium.net/go/mautrix v0.16.1
|
||||
|
|
3
go.sum
3
go.sum
|
@ -11,6 +11,9 @@ github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZb
|
|||
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
|
||||
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
|
||||
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
||||
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
|
8
main.go
8
main.go
|
@ -87,6 +87,7 @@ var dbm core.Database
|
|||
func cleanup() {
|
||||
log.Println("Cleaning up")
|
||||
_ = dbm.StoreSelfStatus(core.EventSelfDown, time.Now())
|
||||
_ = dbm.Persist()
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -151,7 +152,7 @@ func main() {
|
|||
|
||||
// log startup and register cleanup (on SIGTERM or panic)
|
||||
ch_sigterm := make(chan os.Signal)
|
||||
os_signal.Notify(ch_sigterm, syscall.SIGTERM)
|
||||
os_signal.Notify(ch_sigterm, syscall.SIGTERM, syscall.SIGINT)
|
||||
|
||||
log.Print("Starting up")
|
||||
err = dbm.StoreSelfStatus(core.EventSelfUp, time.Now())
|
||||
|
@ -163,7 +164,9 @@ func main() {
|
|||
go func() {
|
||||
<-ch_sigterm
|
||||
err := srv.Shutdown(context.Background())
|
||||
log.Printf("Error when shutting down: %v", err)
|
||||
if err != nil {
|
||||
log.Printf("Error when shutting down: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// timed update
|
||||
|
@ -173,6 +176,7 @@ func main() {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
data := scanner.CheckServiceBatch(ctx, config.Service)
|
||||
dbm.StoreServiceStatusBatch(data, time.Now())
|
||||
// log.Printf("Routine check: %v", data)
|
||||
<-ctx.Done()
|
||||
cancel()
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ Services
|
|||
- store events
|
||||
- [x] self boot up/shutdown
|
||||
- [x] check service status every 5 minutes since boot
|
||||
- [ ] persist data on disk; use a better database?
|
||||
- [x] persist data on disk; use a better database?
|
||||
- [-] Matrix notification (not integrated)
|
||||
|
||||
## Environment Variables
|
||||
|
|
Loading…
Reference in a new issue