7a271ea070
configurable in core/magicnumber.go
280 lines
6.4 KiB
Go
280 lines
6.4 KiB
Go
package core
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/oklog/ulid/v2"
|
|
|
|
. "git.exozy.me/exozyme/status/scanner"
|
|
)
|
|
|
|
type Row struct {
|
|
time time.Time
|
|
kind EventType
|
|
data ServiceStatus
|
|
}
|
|
|
|
type EventType int
|
|
|
|
const (
|
|
// represents when a servire's status changes
|
|
EventService EventType = iota
|
|
// represents when this program starts
|
|
EventSelfUp
|
|
// represents when this program stops
|
|
EventSelfDown
|
|
)
|
|
|
|
type Database interface {
|
|
// dump data to disk
|
|
Persist() error
|
|
|
|
// get all records after some time (`cutoff`)
|
|
//
|
|
// Usage: (get past 7 days of history)
|
|
// db.QueryAllAfter(time.Now().Add(-7 * 24 * time.Hour))
|
|
//
|
|
// Then, parse the result with `GetServiceStatusSpans(rows, service_url)`
|
|
QueryAllAfter(cutoff time.Time) ([]Row, error)
|
|
|
|
// store service status records with timestamp
|
|
// returns non-duplicate rows
|
|
StoreServiceStatusBatch(entries ServiceStatusBatch, when time.Time) ([]ServiceStatus, error)
|
|
|
|
// store self up/down event with timestamp
|
|
//
|
|
// `status` is EventSelfUp or EventSelfDown
|
|
StoreSelfStatus(status EventType, when time.Time) error
|
|
}
|
|
|
|
type Span struct {
|
|
start time.Time
|
|
stop time.Time
|
|
status ServiceStatus
|
|
}
|
|
|
|
func append_close_span(res *[]Span, rows []Row, start_i, stop_i int, status ServiceStatus) {
|
|
var stop_time time.Time
|
|
if stop_i < 0 {
|
|
stop_time = time.Now()
|
|
} else {
|
|
stop_time = rows[stop_i].time
|
|
}
|
|
start_time := rows[start_i].time
|
|
*res = append(*res, Span{start_time, stop_time, status})
|
|
}
|
|
|
|
// Find when the service is up, and when the service is down
|
|
// returns timespans. There might be gaps between two spans, meaning that we don't know the state of that service
|
|
func CalculateServiceStatusSpans(rows []Row, service_url string) []Span {
|
|
res := []Span{}
|
|
start_i := -1
|
|
var status ServiceStatus
|
|
|
|
for i, v := range rows {
|
|
switch v.kind {
|
|
case EventSelfDown:
|
|
append_close_span(&res, rows, start_i, i, status)
|
|
start_i = -1
|
|
case EventService:
|
|
if v.data.Url == service_url {
|
|
append_close_span(&res, rows, start_i, i, status)
|
|
start_i = i
|
|
status = v.data
|
|
}
|
|
}
|
|
}
|
|
append_close_span(&res, rows, start_i, -1, status)
|
|
|
|
return res
|
|
}
|
|
|
|
type dumbDatabase struct {
|
|
rootDir string
|
|
rows []Row
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// Load the newest backup, or start anew
|
|
func OpenDatabase(dir string) (Database, error) {
|
|
if dir == "" {
|
|
return nil, fmt.Errorf("database path is empty. did you set it in config file?")
|
|
}
|
|
err := os.MkdirAll(dir, 0755)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// sort the file name desc. Since the file names are ULID, newer files are sorted first
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Name() > files[j].Name()
|
|
})
|
|
for _, file := range files {
|
|
filename := file.Name()
|
|
_, err := ulid.Parse(filename)
|
|
if err != nil {
|
|
continue // because database files have ULID as names
|
|
}
|
|
fullpath := path.Join(dir, filename)
|
|
content, err := os.ReadFile(fullpath)
|
|
if err != nil {
|
|
log.Printf("[WARN] db file cannot be read: %v", err)
|
|
continue
|
|
}
|
|
var rows []Row
|
|
err = json.NewDecoder(bytes.NewReader(content)).Decode(&rows)
|
|
if err != nil {
|
|
log.Printf("[WARN] db file content cannot be parsed: %v", err)
|
|
continue
|
|
}
|
|
log.Printf("Loaded %d rows from: %v", len(rows), fullpath)
|
|
|
|
return &dumbDatabase{rootDir: dir, rows: rows}, nil
|
|
}
|
|
|
|
return &dumbDatabase{rootDir: dir, rows: []Row{}}, nil
|
|
}
|
|
|
|
func (db *dumbDatabase) sortRows() {
|
|
sort.Slice(db.rows, func(i, j int) bool {
|
|
return db.rows[i].time.Compare(db.rows[j].time) == -1
|
|
})
|
|
}
|
|
|
|
// Dump data to file
|
|
//
|
|
// Steps:
|
|
// create -> write -> fsync -> rename
|
|
func (db *dumbDatabase) Persist() error {
|
|
db.mutex.Lock()
|
|
defer db.mutex.Unlock()
|
|
|
|
db.sortRows()
|
|
|
|
dir := db.rootDir
|
|
// [section] clean up old checkpoints
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// sort the file name desc. Since the file names are ULID, newer files are sorted first
|
|
sort.Slice(files, func(i, j int) bool {
|
|
return files[i].Name() > files[j].Name()
|
|
})
|
|
// delete old files
|
|
if len(files) > DbCheckpointsToKeep {
|
|
for _, file := range files[DbCheckpointsToKeep:] {
|
|
filename := file.Name()
|
|
_, err := ulid.Parse(filename)
|
|
if err != nil {
|
|
continue // because database files have ULID as names
|
|
}
|
|
fullpath := path.Join(dir, filename)
|
|
err = os.Remove(fullpath)
|
|
if err != nil {
|
|
log.Printf("Error when os.Delete: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
id := ulid.Make().String() // checkpoint filename is ULID
|
|
filename_final := path.Join(db.rootDir, id)
|
|
filename_tmp := path.Join(db.rootDir, id+".tmp")
|
|
|
|
file, err := os.Create(filename_tmp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
err = json.NewEncoder(file).Encode(db.rows) // file content is JSON
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = file.Sync()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = os.Rename(filename_tmp, filename_final)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Printf("Saved %d rows to: %v", len(db.rows), filename_final)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *dumbDatabase) QueryAllAfter(cutoff time.Time) ([]Row, error) {
|
|
db.mutex.Lock()
|
|
defer db.mutex.Unlock()
|
|
|
|
db.sortRows()
|
|
|
|
var start int = -1
|
|
for i, v := range db.rows {
|
|
if v.time.Compare(cutoff) >= 0 {
|
|
start = i
|
|
break
|
|
}
|
|
}
|
|
|
|
if start == -1 {
|
|
return []Row{}, nil
|
|
}
|
|
return db.rows[start:], nil
|
|
}
|
|
|
|
func (db *dumbDatabase) StoreServiceStatusBatch(entries ServiceStatusBatch, when time.Time) ([]ServiceStatus, error) {
|
|
non_duplicates := []ServiceStatus{}
|
|
|
|
db.mutex.Lock()
|
|
defer db.mutex.Unlock()
|
|
|
|
for _, v := range entries {
|
|
new_row := Row{when, EventService, v}
|
|
|
|
is_duplicate := false
|
|
for i := len(db.rows) - 1; i >= 0; i-- {
|
|
row := db.rows[i]
|
|
// not duplicate if server restarted in-between
|
|
if row.kind == EventSelfUp || row.kind == EventSelfDown {
|
|
break
|
|
}
|
|
if row.kind == new_row.kind && row.data.Url == new_row.data.Url {
|
|
is_duplicate = row.data == new_row.data
|
|
break
|
|
}
|
|
}
|
|
if !is_duplicate {
|
|
db.rows = append(db.rows, new_row)
|
|
non_duplicates = append(non_duplicates, v)
|
|
}
|
|
}
|
|
return non_duplicates, nil
|
|
}
|
|
|
|
func (db *dumbDatabase) StoreSelfStatus(status EventType, when time.Time) error {
|
|
db.mutex.Lock()
|
|
defer db.mutex.Unlock()
|
|
|
|
if status != EventSelfUp && status != EventSelfDown {
|
|
log.Panicf("(StoreSelfStatus) invalid value: %v", status)
|
|
}
|
|
var placeholder ServiceStatus
|
|
new_row := Row{when, EventType(status), placeholder}
|
|
db.rows = append(db.rows, new_row)
|
|
if len(db.rows) > DbEntriesToKeep {
|
|
db.rows = db.rows[len(db.rows)-DbEntriesToKeep:]
|
|
}
|
|
return nil
|
|
}
|