Kela/server/storage.go

189 lines
4 KiB
Go

package main
import (
"crypto/ed25519"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
// Replicate a user's log to another server
func replicate(id, s string) {
log.Printf("Starting replication for %s %s", id, s)
for true {
mu.Lock()
// Make sure that this server is still the primary for this user
user, ok := users[id]
if !ok {
mu.Unlock()
return
}
if me != user.servers[0] {
user.nextIndex = nil
mu.Unlock()
return
}
// Make sure that the target server is still associated with this user
idx, ok := user.nextIndex[s]
if !ok {
mu.Unlock()
return
}
if idx == len(user.log) {
// Up to date
mu.Unlock()
time.Sleep(50 * time.Millisecond)
continue
}
op := user.log[idx]
mu.Unlock()
file, _ := os.Open(op)
resp, err := http.Post(s+"/storage/"+id+"/"+op+"?idx="+fmt.Sprint(idx), "application/octet-stream", file)
if err != nil {
time.Sleep(50 * time.Millisecond)
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
time.Sleep(50 * time.Millisecond)
continue
}
mu.Lock()
user.nextIndex[s], _ = strconv.Atoi(string(b))
mu.Unlock()
}
}
// Handle storage requests
func storageHandler(w http.ResponseWriter, r *http.Request) {
pathSplit := strings.Split(r.URL.Path, "/")
id := pathSplit[2]
filename := pathSplit[3]
r.ParseForm()
if r.Method == "GET" {
if r.Form.Has("direct") {
// Directly read and respond with file
file, err := os.ReadFile(dataDir + "/" + id + "/" + filename)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusNotFound)
return
}
w.Write(file)
return
}
val := dhtGet(id, false)
err := verify(id, val)
if err != nil {
verify(id, val)
w.WriteHeader(http.StatusNotFound)
return
}
if _, ok := users[id]; ok {
reconfigure(id, val)
}
servers := strings.Split(string(val[8:len(val)-ed25519.SignatureSize]), "\n")
if servers[0] == me {
file, err := os.ReadFile(dataDir + "/" + id + "/" + filename)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusNotFound)
return
}
w.Write(file)
return
}
for _, server := range servers {
resp, err := http.Get(server + "/storage/" + id + "/" + filename)
if err != nil {
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
continue
}
w.Write(b)
return
}
w.WriteHeader(http.StatusNotFound)
} else if r.Method == "POST" {
mu.Lock()
defer mu.Unlock()
user, ok := users[id]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = verify(id, b)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Form.Has("idx") {
idx, err := strconv.Atoi(r.Form.Get("idx"))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if idx > len(user.log) {
// Missing log entries
w.Write([]byte(fmt.Sprint(len(user.log))))
return
}
if idx < len(user.log) {
// Too many log entries
ops := make(map[string]interface{})
for i := idx; i < len(user.log); i++ {
ops[user.log[i]] = nil
}
for op := range ops {
// Fetch older version of file
resp, err := http.Get(user.servers[0] + "/storage/" + id + "/" + op)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = os.WriteFile(dataDir+"/"+id+"/"+op, b, 0644)
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
user.log = user.log[:idx]
}
}
err = os.WriteFile(dataDir+"/"+id+"/"+filename, b, 0644)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
user.log = append(user.log, filename)
}
}