Almost done with DHT

This commit is contained in:
Anthony Wang 2023-05-11 16:31:57 -04:00
parent 45e6568541
commit d87d4d6492
Signed by: a
GPG key ID: 42A5B952E6DD8D38
4 changed files with 156 additions and 68 deletions

View file

@ -16,7 +16,7 @@ Alright, let's solve all those problems above with Kela! Kela consists of three
### Name resolution service
In Kela, each user has an ID, which is a public key. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online.
In Kela, each user has an ID, which is an Ed5519 public key encoded in URL-safe Base64. Each user is associated with one or more Kela servers, which store that user's data. To find out which servers a user is associated with, you can query the name resolution system. All Kela servers participate in the name resolution system and act as DHT nodes. Each server stores a complete list of all DHT nodes. When a new server joins the DHT, it tries to peer with an existing server in the DHT. Say server `example.com` would like to peer with `test.net`. `example.com` first sends a GET request to `test.net/peer?peer=example.com`. `test.net` replies with its list of DHT nodes. Once `example.com` receives this reply, it adds `test.net` to its list of DHT nodes and attempts to peer with all servers in the reply that it hasn't peered with yet. `test.net` now also tries to peer with the server that just contacted it, in this case `example.com`. Servers periodically go through their list of DHT nodes and remove nodes that are no longer online.
The DHT stores key-value pairs. The key consists of a user's public key and timestamp (the current Unix time in seconds divided by 600, rounded down). The value consists of a timestamp (the current Unix time in seconds), a list of servers that the user is associated with, where the first server is their primary server, and a signature. A key-value pair is assigned to the 5 servers with smallest SHA-256 hashes of their domain name greater than the SHA-256 hash of the key. The purpose of the elaborate timestamp in the key is to ensure that the set of servers assigned to a key-value pair rotates every 600 seconds so an attacker must control a very large portion of the DHT to do a denial-of-service attack against a specific key-value pair. When servers join and leave the DHT, the servers that a user is associated with will ensure that that user's key-value pair is assigned to a new server if necessary to ensure that 5 servers store that key-value pair. The DHT supports two operations, get and put. For put operations, the server checks the signature to ensure the validity of the request. When a server receives either of these two operations, it computes the SHA-256 hash of the key and checks if it is supposed to store that key-value pair or not. If it is supposed to store that key-value pair, it performs the operation on that pair. Otherwise, the server will contact in parallel the 5 servers that store this key-value pair. If the operation is a get, the server will look at the 5 replies and return the value with the most recent timestamp. If the operation is a put, and one of the 5 parallel requests fails, the server will remove that offline server from its DHT node list and assign a new server to this key-value pair to replace the offline one. Each server periodically goes through its stored key-value pairs and deletes old ones.

View file

@ -1,16 +1,27 @@
package main
import (
"crypto/ed25519"
"bytes"
"crypto/sha256"
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
)
// Get the sha256sum of string as a URL-safe unpadded base64 string
func sha256sum(s string) string {
b := sha256.Sum256([]byte(s))
return base64.RawURLEncoding.EncodeToString(b[:])
}
// Try to peer with another server
func addPeer(peer string) error {
peerHash := sha256sum(peer)
@ -71,69 +82,123 @@ func peerHandler(w http.ResponseWriter, r *http.Request) {
go addPeer(peer)
}
// Handle DHT requests
func dhtHandler(w http.ResponseWriter, r *http.Request) {
key := r.URL.String()[5:]
keyHash := sha256sum(key)
pubKey := asPubKey(key)
fmt.Println(key, keyHash, pubKey)
mu.Lock()
keyPos := sort.SearchStrings(peerHashes, keyHash)
// Find the position of a key in the DHT
func keyPos(key string) int {
keyPos := sort.SearchStrings(peerHashes, sha256sum(key))
if keyPos < myPos {
keyPos += len(peerHashes)
}
mu.Unlock()
if r.Method == "GET" {
if keyPos - myPos < 5 {
mu.Lock()
if val, ok := kvstore[key]; ok {
w.Write([]byte(val))
} else {
w.WriteHeader(http.StatusNotFound)
return keyPos
}
// Get the value for a key from the DHT
func dhtGet(key string) ([]byte, error) {
keyPos := keyPos(key)
var wg sync.WaitGroup
var ret []byte
bestTime := 0
for i := 0; i < 5 && i < len(peerHashes); i++ {
wg.Add(1)
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
go func() {
defer wg.Done()
resp, err := http.Get(j + "/dht/" + key + "?direct")
if err != nil {
// TODO: Remove this server from DHT?
// For sanity reasons this might be a bad idea
return
}
mu.Unlock()
} else {
for i := 0; i < 5 && i < len(peerHashes); i++ {
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
go func() string {
resp, err := http.Get(j + r.URL.String())
if err != nil {
return ""
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return ""
}
return string(b)
}()
b, err := io.ReadAll(resp.Body)
if err != nil {
return
}
err = verify(key, b)
if err != nil {
return
}
valTime, err := strconv.Atoi(strings.Split(string(b), "\n")[0])
if err != nil {
return
}
if ret == nil || valTime > bestTime {
ret = b
bestTime = valTime
}
}()
}
wg.Wait()
if ret == nil {
return nil, errors.New("id not in kvstore")
}
return ret, nil
}
// Put a key-value pair into the DHT
func dhtPut(key string, val []byte) error {
err := verify(key, val)
if err != nil {
return err
}
keyPos := keyPos(key)
for i := 0; i < 5 && i < len(peerHashes); i++ {
j := hashToDomain[peerHashes[(keyPos+i)%len(peerHashes)]]
go func() {
http.Post(j + "/dht/" + key + "?direct", "application/octet-stream", bytes.NewBuffer(val))
}()
}
return nil
}
// Handle DHT requests
func dhtHandler(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[5:]
r.ParseForm()
if r.Form.Get("direct") != "" {
// Directly modify kvstore
if keyPos(key) - myPos >= 5 {
w.WriteHeader(http.StatusNotFound)
return
}
} else if r.Method == "PUT" {
// Read request body
b, err := io.ReadAll(r.Body)
if r.Method == "GET" {
mu.Lock()
val, ok := kvstore[key + fmt.Sprint(time.Now().Unix() / 600)]
mu.Unlock()
if !ok || verify(key, val) != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Write(val)
} else if r.Method == "POST" {
val, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
mu.Lock()
kvstore[key + fmt.Sprint(time.Now().Unix() / 600)] = val
mu.Unlock()
}
return
}
if r.Method == "GET" {
val, err := dhtGet(key)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Write([]byte(val))
} else if r.Method == "POST" {
val, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// Extract signature
valSplit := strings.Split(string(b), "\n")
sig := valSplit[len(valSplit)-1]
// Verify signature
if !ed25519.Verify(pubKey, b[:len(b)-len(sig)-1], []byte(sig)) {
w.WriteHeader(http.StatusUnauthorized)
err = dhtPut(key, val)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if keyPos - myPos < 5 {
mu.Lock()
kvstore[key] = string(b[:len(b)-len(sig)-1])
mu.Unlock()
} else {
}
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
w.WriteHeader(http.StatusOK)
}
}

View file

@ -12,6 +12,7 @@ import (
type user struct {
pubkey []byte
servers []string
}
var mu sync.Mutex
@ -20,22 +21,7 @@ var myHash string
var myPos int
var hashToDomain map[string]string
var peerHashes []string
var kvstore map[string]string
// Get the sha256sum of string as a URL-safe unpadded base64 string
func sha256sum(s string) string {
b := sha256.Sum256([]byte(s))
return base64.RawURLEncoding.EncodeToString(b[:])
}
// Decode an ID to a public key
func asPubKey(s string) ed25519.PublicKey {
b, err := base64.RawURLEncoding.DecodeString(s)
if err != nil {
return nil
}
return ed25519.PublicKey(b)
}
var kvstore map[string][]byte
func main() {
bindAddr := flag.String("b", ":4200", "bind address")

37
server/user.go Normal file
View file

@ -0,0 +1,37 @@
package main
import (
"crypto/ed25519"
"encoding/base64"
"errors"
"net/http"
)
func verify(id string, body []byte) error {
b, err := base64.RawURLEncoding.DecodeString(id)
if err != nil {
return err
}
if len(body) < ed25519.SignatureSize {
return errors.New("body too short")
}
message := body[:len(body)-ed25519.SignatureSize]
sig := body[len(body)-ed25519.SignatureSize:]
if !ed25519.Verify(ed25519.PublicKey(b), message, sig) {
return errors.New("signature verification failed")
}
return nil
}
// Create user
func createHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
id := r.Form.Get("id")
dhtGet(id)
}
// Delete user
func deleteHandler(w http.ResponseWriter, r *http.Request) {
}