Finish storage server implementation

This commit is contained in:
Anthony Wang 2023-05-12 21:28:48 -04:00
parent ae29c8b047
commit 1695257614
Signed by: a
GPG key ID: 42A5B952E6DD8D38
6 changed files with 351 additions and 126 deletions

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/ed25519" "crypto/ed25519"
"encoding/base64" "encoding/base64"
"encoding/binary"
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
@ -19,10 +20,25 @@ var servers []string
// Post new server list to DHT // Post new server list to DHT
func dhtPost(s string) { func dhtPost(s string) {
message := []byte(fmt.Sprint(time.Now().Unix()) + "\n" + strings.Join(servers, "\n")) buf := new(bytes.Buffer)
fmt.Print(message) err := binary.Write(buf, binary.LittleEndian, time.Now().Unix())
message = append(message, ed25519.Sign(privKey, message)...) if err != nil {
http.Post(s+"/dht/"+id, "application/octet-stream", bytes.NewBuffer(message)) panic(err)
}
_, err = buf.WriteString(strings.Join(servers, "\n"))
if err != nil {
panic(err)
}
var message []byte
_, err = buf.Read(message)
if err != nil {
panic(err)
}
_, err = buf.Write(ed25519.Sign(privKey, message))
if err != nil {
panic(err)
}
fmt.Println(http.Post(s+"/dht/"+id, "application/octet-stream", buf))
} }
func main() { func main() {
@ -55,19 +71,22 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
pubKey := ed25519.PublicKey(pubKeyBytes) pubKey = ed25519.PublicKey(pubKeyBytes)
privKeyBytes, err := os.ReadFile("privkey") privKeyBytes, err := os.ReadFile("privkey")
if err != nil { if err != nil {
panic(err) panic(err)
} }
privKey := ed25519.PublicKey(privKeyBytes) privKey = ed25519.PrivateKey(privKeyBytes)
serversBytes, err := os.ReadFile("servers") serversBytes, err := os.ReadFile("servers")
if err != nil { if err != nil {
panic(err) panic(err)
} }
id = base64.RawURLEncoding.EncodeToString(pubKey) id = base64.RawURLEncoding.EncodeToString(pubKey)
servers := strings.Split(string(serversBytes), "\n") servers = strings.Split(string(serversBytes), "\n")
fmt.Println(pubKey, privKey, servers) if servers[0] == "" {
servers = servers[1:]
}
fmt.Println(id, servers)
if flag.Arg(0) == "add" { if flag.Arg(0) == "add" {
// Add server // Add server

View file

@ -5,6 +5,7 @@ import (
"crypto/ed25519" "crypto/ed25519"
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/binary"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -29,9 +30,19 @@ func sha256sum(s string) string {
return base64.RawURLEncoding.EncodeToString(b[:]) return base64.RawURLEncoding.EncodeToString(b[:])
} }
// Find the position of a key in the DHT
func keyPos(key string) int {
keyPos := sort.SearchStrings(peerHashes, sha256sum(key))
if keyPos < myPos {
keyPos += len(peerHashes)
}
return keyPos
}
// Try to peer with another server // Try to peer with another server
func addPeer(peer string) error { func addPeer(peer string) error {
peerHash := sha256sum(peer) peerHash := sha256sum(peer)
// Check if already peered // Check if already peered
mu.Lock() mu.Lock()
_, ok := hashToDomain[peerHash] _, ok := hashToDomain[peerHash]
@ -44,7 +55,6 @@ func addPeer(peer string) error {
mu.Unlock() mu.Unlock()
// Try request to peer // Try request to peer
log.Printf("%s trying to peer with %s", me, peer)
resp, err := http.Get(peer + "/peer?peer=" + me) resp, err := http.Get(peer + "/peer?peer=" + me)
if err != nil { if err != nil {
// Request failed, delete peer // Request failed, delete peer
@ -54,25 +64,25 @@ func addPeer(peer string) error {
return err return err
} }
log.Printf("%s successfully peered with %s", me, peer) // Add peer
mu.Lock() mu.Lock()
i := sort.SearchStrings(peerHashes, peerHash) i := sort.SearchStrings(peerHashes, peerHash)
peerHashes = append(peerHashes, "") peerHashes = append(peerHashes, "")
copy(peerHashes[i+1:], peerHashes[i:]) copy(peerHashes[i+1:], peerHashes[i:])
peerHashes[i] = peerHash peerHashes[i] = peerHash
myPos = sort.SearchStrings(peerHashes, me) myPos = sort.SearchStrings(peerHashes, myHash)
// Distribute keys to new server // Distribute keys to new server
for id, user := range users { for id, user := range users {
phase := time.Now().Unix() / 600 phase := time.Now().Unix() / 600
if keyPos(id + "\n" + fmt.Sprint(phase))-myPos < 5 { if keyPos(id+"\n"+fmt.Sprint(phase))-myPos < 5 {
go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal))
} }
if keyPos(id + "\n" + fmt.Sprint(phase+1))-myPos < 5 { if keyPos(id+"\n"+fmt.Sprint(phase+1))-myPos < 5 {
go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) go http.Post(peer+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal))
} }
} }
mu.Unlock() mu.Unlock()
log.Printf("%s successfully peered with %s", me, peer)
// Read response body // Read response body
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
@ -95,39 +105,27 @@ func peerHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
mu.Lock()
for _, p := range hashToDomain { for _, p := range hashToDomain {
fmt.Fprintf(w, "%s\n", p) fmt.Fprintf(w, "%s\n", p)
} }
mu.Unlock()
go addPeer(peer) go addPeer(peer)
} }
// Find the position of a key in the DHT
func keyPos(key string) int {
keyPos := sort.SearchStrings(peerHashes, sha256sum(key))
if keyPos < myPos {
keyPos += len(peerHashes)
}
return keyPos
}
// Get the timestamp of this val // Get the timestamp of this val
func timestamp(val []byte) int { func timestamp(val []byte) int64 {
if len(val) < ed25519.SignatureSize { if len(val) < 8+ed25519.SignatureSize {
return 0 return 0
} }
message := string(val[:len(val)-ed25519.SignatureSize]) ret, _ := binary.Varint(val[:8])
timestamp, err := strconv.Atoi(strings.Split(message, "\n")[0]) return ret
if err != nil {
return 0
}
return timestamp
} }
// Get the value for a key from the DHT // Get the value for a key from the DHT
func dhtGet(key, direct string) []byte { func dhtGet(key string, direct bool) []byte {
phase := fmt.Sprint(time.Now().Unix() / 600) phase := fmt.Sprint(time.Now().Unix() / 600)
keyPos := keyPos(key + "\n" + phase) if direct {
if direct != "" && keyPos-myPos < 5 {
// Directly read from kvstore // Directly read from kvstore
mu.Lock() mu.Lock()
val, ok := kvstore[key+"\n"+phase] val, ok := kvstore[key+"\n"+phase]
@ -139,12 +137,14 @@ func dhtGet(key, direct string) []byte {
} }
// Contact 5 servers that store this key-value pair // Contact 5 servers that store this key-value pair
var mu sync.Mutex var mu2 sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var latest []byte var latest []byte
mu.Lock()
keyPos := keyPos(key + "\n" + phase)
for i := 0; i < 5 && i < len(peerHashes); i++ { for i := 0; i < 5 && i < len(peerHashes); i++ {
wg.Add(1) wg.Add(1)
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] j := hashToDomain[peerHashes[(keyPos-i+len(peerHashes))%len(peerHashes)]]
go func() { go func() {
defer wg.Done() defer wg.Done()
resp, err := http.Get(j + "/dht/" + key + "?direct") resp, err := http.Get(j + "/dht/" + key + "?direct")
@ -159,20 +159,21 @@ func dhtGet(key, direct string) []byte {
if err != nil { if err != nil {
return return
} }
mu.Lock() mu2.Lock()
if latest == nil || timestamp(val) > timestamp(latest) { if latest == nil || timestamp(val) > timestamp(latest) {
latest = val latest = val
} }
mu.Unlock() mu2.Unlock()
}() }()
} }
mu.Unlock()
// Wait for all to finish or time out // Wait for all to finish or time out
wg.Wait() wg.Wait()
return latest return latest
} }
// Post a key-value pair into the DHT // Post a key-value pair into the DHT
func dhtPost(key, phase, direct string, val []byte) error { func dhtPost(key, phase string, direct bool, val []byte) error {
err := verify(key, val) err := verify(key, val)
if err != nil { if err != nil {
return err return err
@ -180,21 +181,8 @@ func dhtPost(key, phase, direct string, val []byte) error {
if phase == "" { if phase == "" {
phase = fmt.Sprint(time.Now().Unix() / 600) phase = fmt.Sprint(time.Now().Unix() / 600)
} }
user, ok := users[key]
if ok {
curPhase, err := strconv.Atoi(phase)
if err != nil {
return err
}
nextPhase := time.Now().Unix()/600 + 1
if int64(curPhase) < nextPhase && user.phase < nextPhase {
user.phase = nextPhase
go dhtPost(key, fmt.Sprint(nextPhase), "", val)
}
}
keyPos := keyPos(key + "\n" + phase) if direct {
if direct != "" && keyPos-myPos < 5 {
// Directly write to kvstore // Directly write to kvstore
mu.Lock() mu.Lock()
curVal, ok := kvstore[key+"\n"+phase] curVal, ok := kvstore[key+"\n"+phase]
@ -205,11 +193,31 @@ func dhtPost(key, phase, direct string, val []byte) error {
return nil return nil
} }
// Contact 5 servers that store this key-value pair // Post the key-value pair to the next phase if necessary
for i := 0; i < 5 && i < len(peerHashes); i++ { mu.Lock()
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]] curPhase, err := strconv.Atoi(phase)
go http.Post(j+"/dht/"+key+"?phase="+phase+"&direct=true", "application/octet-stream", bytes.NewBuffer(val)) if err != nil {
return err
} }
nextPhase := time.Now().Unix()/600 + 1
if int64(curPhase) < nextPhase {
user, ok := users[key]
if ok && user.phase < nextPhase {
user.phase = nextPhase
persist(key)
}
go dhtPost(key, fmt.Sprint(nextPhase), false, val)
}
keyPos := keyPos(key + "\n" + phase)
mu.Unlock()
// Contact 5 servers that store this key-value pair
mu.Lock()
for i := 0; i < 5 && i < len(peerHashes); i++ {
j := hashToDomain[peerHashes[(keyPos-i+len(peerHashes))%len(peerHashes)]]
go http.Post(j+"/dht/"+key+"?phase="+phase+"&direct", "application/octet-stream", bytes.NewBuffer(val))
}
mu.Unlock()
return nil return nil
} }
@ -218,7 +226,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[5:] key := r.URL.Path[5:]
r.ParseForm() r.ParseForm()
if r.Method == "GET" { if r.Method == "GET" {
val := dhtGet(key, r.Form.Get("direct")) val := dhtGet(key, r.Form.Has("direct"))
if val == nil { if val == nil {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
@ -226,7 +234,7 @@ func dhtHandler(w http.ResponseWriter, r *http.Request) {
w.Write(val) w.Write(val)
} else if r.Method == "POST" { } else if r.Method == "POST" {
val, err := io.ReadAll(r.Body) val, err := io.ReadAll(r.Body)
if err != nil || dhtPost(key, r.Form.Get("phase"), r.Form.Get("direct"), val) != nil { if err != nil || dhtPost(key, r.Form.Get("phase"), r.Form.Has("direct"), val) != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
@ -242,11 +250,13 @@ func cleanPeers() {
mu.Unlock() mu.Unlock()
_, err := http.Get(peer) _, err := http.Get(peer)
if err != nil { if err != nil {
log.Printf("Removing peer %s", peer)
// Bad response, so remove peer // Bad response, so remove peer
mu.Lock() mu.Lock()
i := sort.SearchStrings(peerHashes, sha256sum(peer)) i := sort.SearchStrings(peerHashes, sha256sum(peer))
peerHashes = append(peerHashes[:i], peerHashes[i+1:]...) peerHashes = append(peerHashes[:i], peerHashes[i+1:]...)
myPos = sort.SearchStrings(peerHashes, me) myPos = sort.SearchStrings(peerHashes, myHash)
// Distribute keys on this server to other servers // Distribute keys on this server to other servers
if len(peerHashes) >= 5 { if len(peerHashes) >= 5 {
@ -254,13 +264,13 @@ func cleanPeers() {
phase := time.Now().Unix() / 600 phase := time.Now().Unix() / 600
kpos := keyPos(id + "\n" + fmt.Sprint(phase)) kpos := keyPos(id + "\n" + fmt.Sprint(phase))
if kpos-i < 5 { if kpos-i < 5 {
server := hashToDomain[peerHashes[(kpos+4)%len(peerHashes)]] server := hashToDomain[peerHashes[(kpos-4+len(peerHashes))%len(peerHashes)]]
go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal))
} }
kpos = keyPos(id + "\n" + fmt.Sprint(phase+1)) kpos = keyPos(id + "\n" + fmt.Sprint(phase+1))
if kpos-i < 5 { if kpos-i < 5 {
server := hashToDomain[peerHashes[(kpos+4)%len(peerHashes)]] server := hashToDomain[peerHashes[(kpos-4+len(peerHashes))%len(peerHashes)]]
go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct=true", "application/octet-stream", bytes.NewBuffer(user.dhtVal)) go http.Post(server+"/dht/"+id+"?phase="+fmt.Sprint(phase+1)+"&direct", "application/octet-stream", bytes.NewBuffer(user.dhtVal))
} }
} }
} }
@ -293,8 +303,10 @@ func redistributeKeys() {
for id, user := range users { for id, user := range users {
nextPhase := time.Now().Unix()/600 + 1 nextPhase := time.Now().Unix()/600 + 1
if user.phase < nextPhase { if user.phase < nextPhase {
go dhtPost(id, fmt.Sprint(nextPhase), "", user.dhtVal) go dhtPost(id, fmt.Sprint(nextPhase), false, user.dhtVal)
} }
user.phase = nextPhase
persist(id)
} }
mu.Unlock() mu.Unlock()
time.Sleep(time.Duration(rand.Intn(300)) * time.Second) time.Sleep(time.Duration(rand.Intn(300)) * time.Second)

View file

@ -16,6 +16,16 @@ var me string
var initialPeer string var initialPeer string
var dataDir string var dataDir string
type LoggingHandler struct {
Handler http.Handler
}
// Log all HTTP requests
func (lh LoggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("Request: %s %s%s", r.Method, me, r.URL.String())
lh.Handler.ServeHTTP(w, r)
}
func main() { func main() {
flag.StringVar(&bindAddr, "b", ":4200", "bind address") flag.StringVar(&bindAddr, "b", ":4200", "bind address")
flag.StringVar(&me, "u", "http://localhost:4200", "public URL") flag.StringVar(&me, "u", "http://localhost:4200", "public URL")
@ -29,6 +39,27 @@ func main() {
myPos = 0 myPos = 0
peerHashes = append(peerHashes, sha256sum(me)) peerHashes = append(peerHashes, sha256sum(me))
hashToDomain = map[string]string{peerHashes[0]: me} hashToDomain = map[string]string{peerHashes[0]: me}
kvstore = make(map[string][]byte)
users = make(map[string]user)
// Load user data from disk
os.Mkdir(dataDir, 0755)
entries, err := os.ReadDir(dataDir)
if err != nil {
log.Fatal(err)
}
for _, entry := range entries {
id := entry.Name()
reader, err := os.Open(dataDir + "/" + id + "/gob")
if err != nil {
log.Fatal(err)
continue
}
var user user
dec := gob.NewDecoder(reader)
dec.Decode(&user)
users[id] = user
}
// Start background functions // Start background functions
if initialPeer != "" { if initialPeer != "" {
@ -38,34 +69,14 @@ func main() {
go cleanKVStore() go cleanKVStore()
go redistributeKeys() go redistributeKeys()
// Load user data from disk
err := os.Mkdir(dataDir, 0755)
if err != nil {
log.Fatal(err)
}
entries, err := os.ReadDir(dataDir)
if err != nil {
log.Fatal(err)
}
for _, entry := range entries {
id := entry.Name()
reader, err := os.Open(dataDir + "/" + id + "/gob")
if err != nil {
continue
}
var user user
dec := gob.NewDecoder(reader)
dec.Decode(&user)
users[id] = user
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello! This is a Kela server.") fmt.Fprintf(w, "Hello! This is a Kela server.")
}) })
http.HandleFunc("/peer", peerHandler) http.HandleFunc("/peer", peerHandler)
http.HandleFunc("/user", userHandler) http.HandleFunc("/user/", userHandler)
http.HandleFunc("/dht", dhtHandler) http.HandleFunc("/dht/", dhtHandler)
http.HandleFunc("/storage", storageHandler) http.HandleFunc("/storage/", storageHandler)
http.HandleFunc("/message", messageHandler) http.HandleFunc("/message/", messageHandler)
log.Fatal(http.ListenAndServe(bindAddr, nil)) log.Fatal(http.ListenAndServe(bindAddr, LoggingHandler{Handler: http.DefaultServeMux}))
} }

View file

@ -1,19 +1,176 @@
package main package main
import ( import (
"crypto/ed25519"
"fmt"
"io"
"net/http" "net/http"
"os"
"strconv"
"strings"
"time"
) )
// Replicate a user's log to another server
func replicate(id, s string) {
for true {
mu.Lock()
// Make sure that this server is still the primary for this user
user, ok := users[id]
if !ok {
mu.Unlock()
return
}
if me != user.servers[0] {
user.nextIndex = nil
mu.Unlock()
return
}
// Make sure that the target server is still associated with this user
idx, ok := user.nextIndex[s]
if !ok {
mu.Unlock()
return
}
if idx == len(user.log) {
// Up to date
mu.Unlock()
time.Sleep(50 * time.Millisecond)
continue
}
op := user.log[idx]
mu.Unlock()
file, _ := os.Open(op)
resp, err := http.Post(s + "/storage/" + id + "/" + op + "?idx=" + fmt.Sprint(idx), "application/octet-stream", file)
if err != nil {
time.Sleep(50 * time.Millisecond)
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
time.Sleep(50 * time.Millisecond)
continue
}
mu.Lock()
user.nextIndex[s], _ = strconv.Atoi(string(b))
mu.Unlock()
}
}
// Handle storage requests // Handle storage requests
func storageHandler(w http.ResponseWriter, r *http.Request) { func storageHandler(w http.ResponseWriter, r *http.Request) {
// filename := r.URL.String()[5:] pathSplit := strings.Split(r.URL.Path, "/")
id := pathSplit[1]
filename := pathSplit[2]
r.ParseForm()
if r.Method == "GET" { if r.Method == "GET" {
if r.Form.Has("direct") {
// Directly read and respond with file
file, err := os.ReadFile(dataDir + "/" + id + "/" + filename)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Write(file)
return
}
val := dhtGet(id, false)
if verify(id, val) != nil {
w.WriteHeader(http.StatusNotFound)
return
}
if _, ok := users[id]; ok {
reconfigure(id, val)
}
servers := strings.Split(string(val[8:len(val)-ed25519.SignatureSize]), "\n")
if servers[0] == me {
file, err := os.ReadFile(dataDir + "/" + id + "/" + filename)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Write(file)
return
}
for _, server := range servers {
resp, err := http.Get(server + "/storage/" + id + "/" + filename)
if err != nil {
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
continue
}
w.Write(b)
return
}
w.WriteHeader(http.StatusNotFound)
} else if r.Method == "POST" {
mu.Lock()
defer mu.Unlock()
user, ok := users[id]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
} else if r.Method == "PUT" { b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if verify(id, b) != nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
} else if r.Method == "DELETE" { if r.Form.Has("idx") {
idx, err := strconv.Atoi(r.Form.Get("idx"))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
} else { if idx > len(user.log) {
w.WriteHeader(http.StatusMethodNotAllowed) // Missing log entries
w.Write([]byte(fmt.Sprint(len(user.log))))
return
}
if idx < len(user.log) {
// Too many log entries
ops := make(map[string]interface{})
for i := idx; i < len(user.log); i++ {
ops[user.log[i]] = nil
}
for op := range ops {
// Fetch older version of file
resp, err := http.Get(user.servers[0] + "/storage/" + id + "/" + op)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
b, err := io.ReadAll(resp.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = os.WriteFile(dataDir + "/" + id + "/" + op, b, 0644)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
}
}
err = os.WriteFile(dataDir + "/" + id + "/" + filename, b, 0644)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
user.log = append(user.log, filename)
} }
} }

View file

@ -1,10 +1,9 @@
#!/bin/bash #!/bin/bash
trap "kill 0" EXIT trap "kill 0" EXIT
go build go run . -d 0 &
./server -d 0 &
for i in $(seq 1 9) for i in $(seq 1 9)
do do
sleep 0.1 sleep 0.2
./server -d $i -b :420$i -u http://localhost:420$i -i http://localhost:420$((i-1)) & go run . -d $i -b :420$i -u http://localhost:420$i -i http://localhost:420$((i-1)) &
done done
wait wait

View file

@ -11,8 +11,11 @@ import (
) )
type user struct { type user struct {
dhtVal []byte dhtVal []byte
phase int64 phase int64
servers []string
log []string
nextIndex map[string]int
} }
var users map[string]user var users map[string]user
@ -36,7 +39,7 @@ func verify(id string, body []byte) error {
// Persist a user's data to disk // Persist a user's data to disk
func persist(id string) { func persist(id string) {
writer, err := os.Open(dataDir + "/" + id + "/gob") writer, err := os.OpenFile(dataDir+"/"+id+"/gob", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
return return
} }
@ -44,37 +47,61 @@ func persist(id string) {
enc.Encode(users[id]) enc.Encode(users[id])
} }
// Reconfigure a user based on a DHT get
func reconfigure(id string, dhtVal []byte) {
mu.Lock()
defer mu.Unlock()
user := users[id]
if timestamp(dhtVal) < timestamp(user.dhtVal) {
return
}
user.dhtVal = dhtVal
servers := strings.Split(string(dhtVal[8:len(dhtVal)-ed25519.SignatureSize]), "\n")
if servers[0] == me {
if user.nextIndex == nil {
user.nextIndex = make(map[string]int)
}
for _, server := range servers {
if _, ok := user.nextIndex[server]; !ok {
user.nextIndex[server] = len(user.log)
go replicate(id, server)
}
}
}
inServers := false
for _, server := range servers {
if server == me {
inServers = true
}
}
persist(id)
if !inServers {
delete(users, id)
_ = os.RemoveAll(id)
}
}
// Handle user configuration changes // Handle user configuration changes
func userHandler(w http.ResponseWriter, r *http.Request) { func userHandler(w http.ResponseWriter, r *http.Request) {
id := r.URL.Fragment[6:] id := r.URL.Path[6:]
// Resolve ID to server list // Resolve ID to server list
val := dhtGet(id, "") val := dhtGet(id, false)
if verify(id, val) != nil { if verify(id, val) != nil {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
// Check if server list contains this server mu.Lock()
message := string(val[:len(val)-ed25519.SignatureSize])
if !strings.Contains(message, me) {
// Delete user if they are no longer associated with this server
delete(users, id)
err := os.RemoveAll(id)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
return
}
//valSplit := strings.Split(message, "\n")
//servers := valSplit[1:len(valSplit)-1]
if _, ok := users[id]; !ok { if _, ok := users[id]; !ok {
// Add user // Add user
users[id] = user{ users[id] = user{dhtVal: val}
dhtVal: val, os.Mkdir(dataDir+"/"+id, 0755)
}
os.Mkdir(id, 0755)
persist(id) persist(id)
} }
mu.Unlock()
reconfigure(id, val)
w.WriteHeader(http.StatusOK)
} }