Add database helpers. Comment it.
This commit is contained in:
parent
2c81432664
commit
47a646e78b
1 changed files with 71 additions and 14 deletions
|
@ -26,24 +26,76 @@ type Row struct {
|
|||
type EventType int
|
||||
|
||||
const (
|
||||
EventService = iota
|
||||
// represents when a servire's status changes
|
||||
EventService EventType = iota
|
||||
// represents when this program starts
|
||||
EventSelfUp
|
||||
// represents when this program stops
|
||||
EventSelfDown
|
||||
)
|
||||
|
||||
// either EventSelfUp or EventSelfDown
|
||||
// I want real enum...
|
||||
type SelfStatusUpdate EventType
|
||||
|
||||
type Database interface {
|
||||
// dump data to disk
|
||||
Persist() error
|
||||
|
||||
// get all records after some time (`cutoff`)
|
||||
QueryAllAfter(url string, cutoff time.Time) ([]Row, error)
|
||||
//
|
||||
// Usage: (get past 7 days of history)
|
||||
// db.QueryAllAfter(time.Now().Add(-7 * 24 * time.Hour))
|
||||
//
|
||||
// Then, parse the result with `GetServiceStatusSpans(rows, service_url)`
|
||||
QueryAllAfter(cutoff time.Time) ([]Row, error)
|
||||
|
||||
// store service status records with timestamp
|
||||
StoreServiceStatusBatch(entries scanner.ServiceStatusBatch, when time.Time) error
|
||||
|
||||
// store self up/down event with timestamp
|
||||
StoreSelfStatus(status SelfStatusUpdate, when time.Time) error
|
||||
//
|
||||
// `status` is EventSelfUp or EventSelfDown
|
||||
StoreSelfStatus(status EventType, when time.Time) error
|
||||
}
|
||||
|
||||
type Span struct {
|
||||
start time.Time
|
||||
stop time.Time
|
||||
status scanner.ServiceStatus
|
||||
}
|
||||
|
||||
func append_close_span(res *[]Span, rows []Row, start_i, stop_i int, status scanner.ServiceStatus) {
|
||||
var stop_time time.Time
|
||||
if stop_i < 0 {
|
||||
stop_time = time.Now()
|
||||
} else {
|
||||
stop_time = rows[stop_i].time
|
||||
}
|
||||
start_time := rows[start_i].time
|
||||
*res = append(*res, Span{start_time, stop_time, status})
|
||||
}
|
||||
|
||||
// Find when the service is up, and when the service is down
|
||||
// returns timespans. There might be gaps between two spans, meaning that we don't know the state of that service
|
||||
func CalculateServiceStatusSpans(rows []Row, service_url string) []Span {
|
||||
res := []Span{}
|
||||
start_i := -1
|
||||
var status scanner.ServiceStatus
|
||||
|
||||
for i, v := range rows {
|
||||
switch v.kind {
|
||||
case EventSelfDown:
|
||||
append_close_span(&res, rows, start_i, i, status)
|
||||
start_i = -1
|
||||
case EventService:
|
||||
if v.data.Url == service_url {
|
||||
append_close_span(&res, rows, start_i, i, status)
|
||||
start_i = i
|
||||
status = v.data
|
||||
}
|
||||
}
|
||||
}
|
||||
append_close_span(&res, rows, start_i, -1, status)
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
type dumbDatabase struct {
|
||||
|
@ -52,6 +104,7 @@ type dumbDatabase struct {
|
|||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// Load the newest backup, or start anew
|
||||
func OpenDatabase(dir string) (Database, error) {
|
||||
if dir == "" {
|
||||
return nil, fmt.Errorf("database path is empty. did you set it in config file?")
|
||||
|
@ -75,7 +128,7 @@ func OpenDatabase(dir string) (Database, error) {
|
|||
}
|
||||
_, err := ulid.Parse(filename)
|
||||
if err != nil {
|
||||
continue
|
||||
continue // because database files have ULID as names
|
||||
}
|
||||
fullpath := path.Join(dir, filename)
|
||||
content, err := os.ReadFile(fullpath)
|
||||
|
@ -103,12 +156,16 @@ func (db *dumbDatabase) sortRows() {
|
|||
})
|
||||
}
|
||||
|
||||
// Dump data to file
|
||||
//
|
||||
// Steps:
|
||||
// create -> write -> fsync -> rename
|
||||
func (db *dumbDatabase) Persist() error {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
db.sortRows()
|
||||
id := ulid.Make().String()
|
||||
id := ulid.Make().String() // checkpoint filename is ULID
|
||||
|
||||
filename_final := path.Join(db.rootDir, id)
|
||||
filename_tmp := path.Join(db.rootDir, id+".tmp")
|
||||
|
@ -117,7 +174,7 @@ func (db *dumbDatabase) Persist() error {
|
|||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
err = json.NewEncoder(file).Encode(db.rows)
|
||||
err = json.NewEncoder(file).Encode(db.rows) // file content is JSON
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -133,7 +190,7 @@ func (db *dumbDatabase) Persist() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *dumbDatabase) QueryAllAfter(url string, cutoff time.Time) ([]Row, error) {
|
||||
func (db *dumbDatabase) QueryAllAfter(cutoff time.Time) ([]Row, error) {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
|
@ -183,7 +240,7 @@ func (db *dumbDatabase) StoreServiceStatusBatch(entries scanner.ServiceStatusBat
|
|||
|
||||
const max_rows = 10000
|
||||
|
||||
func (db *dumbDatabase) StoreSelfStatus(status SelfStatusUpdate, when time.Time) error {
|
||||
func (db *dumbDatabase) StoreSelfStatus(status EventType, when time.Time) error {
|
||||
db.mutex.Lock()
|
||||
defer db.mutex.Unlock()
|
||||
|
||||
|
|
Loading…
Reference in a new issue