status/core/db_prototype.go

200 lines
4.5 KiB
Go
Raw Permalink Normal View History

2023-10-03 10:22:01 +00:00
package core
2023-10-03 12:44:55 +00:00
import (
2023-10-03 18:23:25 +00:00
"bytes"
"encoding/json"
"fmt"
2023-10-03 13:23:57 +00:00
"log"
2023-10-03 18:23:25 +00:00
"os"
"path"
2023-10-03 13:23:57 +00:00
"sort"
2023-10-03 18:23:25 +00:00
"sync"
2023-10-03 12:44:55 +00:00
"time"
2023-10-03 18:23:25 +00:00
"github.com/oklog/ulid/v2"
2024-07-07 03:30:12 +00:00
. "git.exozy.me/exozyme/status/scanner"
2023-10-03 12:44:55 +00:00
)
2024-07-07 03:30:12 +00:00
type PrototypeDb struct {
2023-10-03 18:23:25 +00:00
rootDir string
rows []Row
mutex sync.Mutex
2023-10-03 13:23:57 +00:00
}
2023-10-04 09:55:08 +00:00
// Load the newest backup, or start anew
2024-07-07 03:30:12 +00:00
func OpenPrototypeDb(dir string) (Database, error) {
2023-10-03 18:23:25 +00:00
if dir == "" {
return nil, fmt.Errorf("database path is empty. did you set it in config file?")
}
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
// sort the file name desc. Since the file names are ULID, newer files are sorted first
sort.Slice(files, func(i, j int) bool {
return files[i].Name() > files[j].Name()
})
for _, file := range files {
filename := file.Name()
_, err := ulid.Parse(filename)
if err != nil {
2023-10-04 09:55:08 +00:00
continue // because database files have ULID as names
2023-10-03 18:23:25 +00:00
}
fullpath := path.Join(dir, filename)
content, err := os.ReadFile(fullpath)
if err != nil {
log.Printf("[WARN] db file cannot be read: %v", err)
continue
}
var rows []Row
err = json.NewDecoder(bytes.NewReader(content)).Decode(&rows)
if err != nil {
log.Printf("[WARN] db file content cannot be parsed: %v", err)
continue
}
log.Printf("Loaded %d rows from: %v", len(rows), fullpath)
2024-07-07 03:30:12 +00:00
return &PrototypeDb{rootDir: dir, rows: rows}, nil
2023-10-03 18:23:25 +00:00
}
2024-07-07 03:30:12 +00:00
return &PrototypeDb{rootDir: dir, rows: []Row{}}, nil
2023-10-03 12:44:55 +00:00
}
2024-07-07 03:30:12 +00:00
func (db *PrototypeDb) sortRows() {
2023-10-03 13:23:57 +00:00
sort.Slice(db.rows, func(i, j int) bool {
return db.rows[i].time.Compare(db.rows[j].time) == -1
})
}
2023-10-04 09:55:08 +00:00
// Dump data to file
//
// Steps:
// create -> write -> fsync -> rename
2024-07-07 03:30:12 +00:00
func (db *PrototypeDb) Persist() error {
2023-10-03 18:23:25 +00:00
db.mutex.Lock()
defer db.mutex.Unlock()
db.sortRows()
2023-10-08 09:54:36 +00:00
dir := db.rootDir
// [section] clean up old checkpoints
files, err := os.ReadDir(dir)
if err != nil {
return err
}
// sort the file name desc. Since the file names are ULID, newer files are sorted first
sort.Slice(files, func(i, j int) bool {
return files[i].Name() > files[j].Name()
})
// delete old files
if len(files) > DbCheckpointsToKeep {
for _, file := range files[DbCheckpointsToKeep:] {
2023-10-08 09:54:36 +00:00
filename := file.Name()
_, err := ulid.Parse(filename)
if err != nil {
continue // because database files have ULID as names
}
fullpath := path.Join(dir, filename)
err = os.Remove(fullpath)
if err != nil {
log.Printf("Error when os.Delete: %v", err)
}
}
}
id := ulid.Make().String() // checkpoint filename is ULID
2023-10-03 18:23:25 +00:00
filename_final := path.Join(db.rootDir, id)
filename_tmp := path.Join(db.rootDir, id+".tmp")
2023-10-08 09:54:36 +00:00
2023-10-03 18:23:25 +00:00
file, err := os.Create(filename_tmp)
if err != nil {
return err
}
defer file.Close()
2023-10-04 09:55:08 +00:00
err = json.NewEncoder(file).Encode(db.rows) // file content is JSON
2023-10-03 18:23:25 +00:00
if err != nil {
return err
}
err = file.Sync()
if err != nil {
return err
}
err = os.Rename(filename_tmp, filename_final)
if err != nil {
return err
}
log.Printf("Saved %d rows to: %v", len(db.rows), filename_final)
2023-10-08 09:54:36 +00:00
2023-10-03 18:23:25 +00:00
return nil
}
2024-07-07 03:30:12 +00:00
func (db *PrototypeDb) QueryAllAfter(cutoff time.Time) ([]Row, error) {
2023-10-03 18:23:25 +00:00
db.mutex.Lock()
defer db.mutex.Unlock()
2023-10-04 09:55:08 +00:00
2023-10-03 13:23:57 +00:00
db.sortRows()
var start int = -1
for i, v := range db.rows {
if v.time.Compare(cutoff) >= 0 {
start = i
break
}
}
if start == -1 {
return []Row{}, nil
}
return db.rows[start:], nil
}
2024-07-07 03:30:12 +00:00
func (db *PrototypeDb) StoreServiceStatusBatch(entries ServiceStatusBatch, when time.Time) ([]ServiceStatus, error) {
2023-10-07 08:24:49 +00:00
non_duplicates := []ServiceStatus{}
2023-10-03 18:23:25 +00:00
db.mutex.Lock()
defer db.mutex.Unlock()
2023-10-04 09:55:08 +00:00
2023-10-03 13:23:57 +00:00
for _, v := range entries {
new_row := Row{when, EventService, v}
2023-10-03 15:11:46 +00:00
is_duplicate := false
for i := len(db.rows) - 1; i >= 0; i-- {
row := db.rows[i]
// not duplicate if server restarted in-between
if row.kind == EventSelfUp || row.kind == EventSelfDown {
break
}
if row.kind == new_row.kind && row.data.Url == new_row.data.Url {
2023-10-07 08:39:34 +00:00
is_duplicate = row.data == new_row.data
break
2023-10-03 13:23:57 +00:00
}
2023-10-03 15:11:46 +00:00
}
if !is_duplicate {
2023-10-03 13:23:57 +00:00
db.rows = append(db.rows, new_row)
non_duplicates = append(non_duplicates, v)
2023-10-03 13:23:57 +00:00
}
}
2023-10-07 08:24:49 +00:00
return non_duplicates, nil
2023-10-03 12:44:55 +00:00
}
2024-07-07 03:30:12 +00:00
func (db *PrototypeDb) StoreSelfStatus(status EventType, when time.Time) error {
2023-10-03 18:23:25 +00:00
db.mutex.Lock()
defer db.mutex.Unlock()
2023-10-04 09:55:08 +00:00
2023-10-03 13:23:57 +00:00
if status != EventSelfUp && status != EventSelfDown {
log.Panicf("(StoreSelfStatus) invalid value: %v", status)
}
2023-10-07 08:24:49 +00:00
var placeholder ServiceStatus
2023-10-03 13:23:57 +00:00
new_row := Row{when, EventType(status), placeholder}
db.rows = append(db.rows, new_row)
if len(db.rows) > DbEntriesToKeep {
db.rows = db.rows[len(db.rows)-DbEntriesToKeep:]
2023-10-03 13:49:49 +00:00
}
2023-10-03 12:44:55 +00:00
return nil
}