status/core/db_prototype.go
2024-07-07 03:31:34 +00:00

199 lines
4.5 KiB
Go

package core
import (
"bytes"
"encoding/json"
"fmt"
"log"
"os"
"path"
"sort"
"sync"
"time"
"github.com/oklog/ulid/v2"
. "git.exozy.me/exozyme/status/scanner"
)
type PrototypeDb struct {
rootDir string
rows []Row
mutex sync.Mutex
}
// Load the newest backup, or start anew
func OpenPrototypeDb(dir string) (Database, error) {
if dir == "" {
return nil, fmt.Errorf("database path is empty. did you set it in config file?")
}
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
// sort the file name desc. Since the file names are ULID, newer files are sorted first
sort.Slice(files, func(i, j int) bool {
return files[i].Name() > files[j].Name()
})
for _, file := range files {
filename := file.Name()
_, err := ulid.Parse(filename)
if err != nil {
continue // because database files have ULID as names
}
fullpath := path.Join(dir, filename)
content, err := os.ReadFile(fullpath)
if err != nil {
log.Printf("[WARN] db file cannot be read: %v", err)
continue
}
var rows []Row
err = json.NewDecoder(bytes.NewReader(content)).Decode(&rows)
if err != nil {
log.Printf("[WARN] db file content cannot be parsed: %v", err)
continue
}
log.Printf("Loaded %d rows from: %v", len(rows), fullpath)
return &PrototypeDb{rootDir: dir, rows: rows}, nil
}
return &PrototypeDb{rootDir: dir, rows: []Row{}}, nil
}
func (db *PrototypeDb) sortRows() {
sort.Slice(db.rows, func(i, j int) bool {
return db.rows[i].time.Compare(db.rows[j].time) == -1
})
}
// Dump data to file
//
// Steps:
// create -> write -> fsync -> rename
func (db *PrototypeDb) Persist() error {
db.mutex.Lock()
defer db.mutex.Unlock()
db.sortRows()
dir := db.rootDir
// [section] clean up old checkpoints
files, err := os.ReadDir(dir)
if err != nil {
return err
}
// sort the file name desc. Since the file names are ULID, newer files are sorted first
sort.Slice(files, func(i, j int) bool {
return files[i].Name() > files[j].Name()
})
// delete old files
if len(files) > DbCheckpointsToKeep {
for _, file := range files[DbCheckpointsToKeep:] {
filename := file.Name()
_, err := ulid.Parse(filename)
if err != nil {
continue // because database files have ULID as names
}
fullpath := path.Join(dir, filename)
err = os.Remove(fullpath)
if err != nil {
log.Printf("Error when os.Delete: %v", err)
}
}
}
id := ulid.Make().String() // checkpoint filename is ULID
filename_final := path.Join(db.rootDir, id)
filename_tmp := path.Join(db.rootDir, id+".tmp")
file, err := os.Create(filename_tmp)
if err != nil {
return err
}
defer file.Close()
err = json.NewEncoder(file).Encode(db.rows) // file content is JSON
if err != nil {
return err
}
err = file.Sync()
if err != nil {
return err
}
err = os.Rename(filename_tmp, filename_final)
if err != nil {
return err
}
log.Printf("Saved %d rows to: %v", len(db.rows), filename_final)
return nil
}
func (db *PrototypeDb) QueryAllAfter(cutoff time.Time) ([]Row, error) {
db.mutex.Lock()
defer db.mutex.Unlock()
db.sortRows()
var start int = -1
for i, v := range db.rows {
if v.time.Compare(cutoff) >= 0 {
start = i
break
}
}
if start == -1 {
return []Row{}, nil
}
return db.rows[start:], nil
}
func (db *PrototypeDb) StoreServiceStatusBatch(entries ServiceStatusBatch, when time.Time) ([]ServiceStatus, error) {
non_duplicates := []ServiceStatus{}
db.mutex.Lock()
defer db.mutex.Unlock()
for _, v := range entries {
new_row := Row{when, EventService, v}
is_duplicate := false
for i := len(db.rows) - 1; i >= 0; i-- {
row := db.rows[i]
// not duplicate if server restarted in-between
if row.kind == EventSelfUp || row.kind == EventSelfDown {
break
}
if row.kind == new_row.kind && row.data.Url == new_row.data.Url {
is_duplicate = row.data == new_row.data
break
}
}
if !is_duplicate {
db.rows = append(db.rows, new_row)
non_duplicates = append(non_duplicates, v)
}
}
return non_duplicates, nil
}
func (db *PrototypeDb) StoreSelfStatus(status EventType, when time.Time) error {
db.mutex.Lock()
defer db.mutex.Unlock()
if status != EventSelfUp && status != EventSelfDown {
log.Panicf("(StoreSelfStatus) invalid value: %v", status)
}
var placeholder ServiceStatus
new_row := Row{when, EventType(status), placeholder}
db.rows = append(db.rows, new_row)
if len(db.rows) > DbEntriesToKeep {
db.rows = db.rows[len(db.rows)-DbEntriesToKeep:]
}
return nil
}