Kela/server/dht.go

270 lines
5.7 KiB
Go

package main
import (
"bytes"
"crypto/ed25519"
"crypto/sha256"
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
)
var myHash string
var myPos int
var hashToDomain map[string]string
var peerHashes []string
var kvstore map[string][]byte
// Get the sha256sum of string as a URL-safe unpadded base64 string
func sha256sum(s string) string {
b := sha256.Sum256([]byte(s))
return base64.RawURLEncoding.EncodeToString(b[:])
}
// Try to peer with another server
func addPeer(peer string) error {
peerHash := sha256sum(peer)
// Check if already peered
mu.Lock()
_, ok := hashToDomain[peerHash]
mu.Unlock()
if ok {
return nil
}
mu.Lock()
hashToDomain[peerHash] = peer
mu.Unlock()
// Try request to peer
log.Printf("%s trying to peer with %s", me, peer)
resp, err := http.Get(peer + "/peer?peer=" + me)
if err != nil {
// Request failed, delete peer
mu.Lock()
delete(hashToDomain, peerHash)
mu.Unlock()
return err
}
log.Printf("%s successfully peered with %s", me, peer)
mu.Lock()
i := sort.SearchStrings(peerHashes, peerHash)
peerHashes = append(peerHashes, "")
copy(peerHashes[i+1:], peerHashes[i:])
peerHashes[i] = peerHash
myPos = sort.SearchStrings(peerHashes, me)
// TODO: redistribute keys
mu.Unlock()
// Read response body
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
// Try adding all peers of this peer
newPeers := strings.Split(string(body), "\n")
for _, newPeer := range newPeers[:len(newPeers)-1] {
go addPeer(newPeer)
}
return nil
}
// Handle incoming peer requests
func peerHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
peer := r.Form.Get("peer")
if peer == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
for _, p := range hashToDomain {
fmt.Fprintf(w, "%s\n", p)
}
go addPeer(peer)
}
// Find the position of a key in the DHT
func keyPos(key string) int {
keyPos := sort.SearchStrings(peerHashes, sha256sum(key))
if keyPos < myPos {
keyPos += len(peerHashes)
}
return keyPos
}
// Get the timestamp of this val
func timestamp(val []byte) int {
if len(val) < ed25519.SignatureSize {
return 0
}
message := string(val[:len(val)-ed25519.SignatureSize])
timestamp, err := strconv.Atoi(strings.Split(message, "\n")[0])
if err != nil {
return 0
}
return timestamp
}
// Get the value for a key from the DHT
func dhtGet(key string) ([]byte, error) {
keyPos := keyPos(key)
var wg sync.WaitGroup
var latest []byte
for i := 0; i < 5 && i < len(peerHashes); i++ {
wg.Add(1)
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
go func() {
defer wg.Done()
resp, err := http.Get(j + "/dht/" + key + "?direct")
if err != nil {
return
}
val, err := io.ReadAll(resp.Body)
if err != nil {
return
}
err = verify(key, val)
if err != nil {
return
}
if latest == nil || timestamp(val) > timestamp(latest) {
latest = val
}
}()
}
wg.Wait()
if latest == nil {
return nil, errors.New("key not found in kvstore")
}
return latest, nil
}
// Post a key-value pair into the DHT
func dhtPost(key string, val []byte) error {
err := verify(key, val)
if err != nil {
return err
}
keyPos := keyPos(key)
for i := 0; i < 5 && i < len(peerHashes); i++ {
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
go func() {
http.Post(j+"/dht/"+key+"?direct", "application/octet-stream", bytes.NewBuffer(val))
}()
}
return nil
}
// Handle DHT requests
func dhtHandler(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[5:]
r.ParseForm()
if r.Form.Get("direct") != "" {
// Directly modify kvstore
if keyPos(key)-myPos >= 5 {
w.WriteHeader(http.StatusNotFound)
return
}
phase := time.Now().Unix()/600
if r.Method == "GET" {
mu.Lock()
val, ok := kvstore[key+"\n"+fmt.Sprint(phase)]
mu.Unlock()
if !ok || verify(key, val) != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Write(val)
} else if r.Method == "POST" {
val, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
mu.Lock()
// Update key for this phase and next one
kvstore[key+"\n"+fmt.Sprint(phase)] = val
kvstore[key+"\n"+fmt.Sprint(phase+1)] = val
mu.Unlock()
}
return
}
if r.Method == "GET" {
val, err := dhtGet(key)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Write([]byte(val))
} else if r.Method == "POST" {
val, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = dhtPost(key, val)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}
// Clean out offline peers
func cleanPeers() {
for true {
mu.Lock()
peer := hashToDomain[peerHashes[rand.Intn(len(peerHashes))]]
mu.Unlock()
_, err := http.Get(peer)
if err != nil {
// Bad response, so remove peer
mu.Lock()
i := sort.SearchStrings(peerHashes, sha256sum(peer))
peerHashes = append(peerHashes[:i], peerHashes[i+1:]...)
myPos = sort.SearchStrings(peerHashes, me)
// TODO: redistribute keys
mu.Unlock()
}
time.Sleep(5 * time.Second)
}
}
// Clean out old keys from the KVStore every minute
func cleanKVStore() {
for true {
// This locks the mutex for a while which sucks
mu.Lock()
for key := range kvstore {
timestamp, err := strconv.Atoi(strings.Split(key, "\n")[1])
if err != nil || int64(timestamp)+1 < time.Now().Unix()/600 {
delete(kvstore, key)
}
}
mu.Unlock()
time.Sleep(time.Minute)
}
}
// Redistribute key-value pairs periodically
func redistributeKeys() {
for true {
for id, user := range users {
dhtPost(id, user.dhtVal)
}
time.Sleep(time.Duration(rand.Intn(300)) * time.Second)
}
}