get rid of 'id' field in messages, use Blake2b hash instead
This commit is contained in:
parent
166dbee97b
commit
3b9aa3f301
|
@ -1,7 +1,6 @@
|
|||
FloofProtocol { 2 25 187636631533261907981725980838781846034 }
|
||||
DEFINITIONS EXPLICIT TAGS ::= BEGIN
|
||||
|
||||
ActId ::= OCTET STRING (SIZE(16..1024))
|
||||
Signature ::= SEQUENCE {
|
||||
algorithm OBJECT IDENTIFIER,
|
||||
value OCTET STRING
|
||||
|
@ -26,8 +25,6 @@ RelativeDistinguishedName ::= SEQUENCE {
|
|||
RDNSequence ::= SEQUENCE OF RelativeDistinguishedName
|
||||
|
||||
XferInner ::= SEQUENCE {
|
||||
id ActId,
|
||||
|
||||
-- message topic --
|
||||
topic RDNSequence,
|
||||
|
||||
|
@ -68,7 +65,7 @@ SummaryDirection ::= ENUMERATED {
|
|||
|
||||
Summary ::= SEQUENCE {
|
||||
direction SummaryDirection,
|
||||
id ActId
|
||||
ids SET OF OCTET STRING
|
||||
}
|
||||
|
||||
ProtoMessage ::= CHOICE {
|
||||
|
|
79
lib/floof.ex
79
lib/floof.ex
|
@ -58,16 +58,17 @@ defmodule Floof do
|
|||
backlog = handle_incoming(client, backlog, dcd)
|
||||
serve(client, backlog)
|
||||
|
||||
{:fwdxfer, {dcdid, dcd}} ->
|
||||
:ok = send_summary(client, :requestpull, dcdid)
|
||||
serve(client, Map.put(backlog, dcdid, dcd))
|
||||
{:fwdxfer, {dcdhash, dcd}} ->
|
||||
:ok = send_summary(client, :requestpull, [dcdhash])
|
||||
serve(client, Map.put(backlog, dcdhash, dcd))
|
||||
|
||||
{:SessionPushed, key} ->
|
||||
{backlog, added_ids} = extract_backlog(key, backlog, MapSet.new())
|
||||
for id <- added_ids, do: :ok = send_summary(client, :requestpull, id)
|
||||
{backlog, added_hashs} = extract_backlog(key, backlog, MapSet.new())
|
||||
:ok = send_summary(client, :requestpull, added_hashs)
|
||||
serve(client, backlog)
|
||||
|
||||
{:tcp_closed, _} ->
|
||||
# TODO: we need to put all the backlogged stuff back...
|
||||
nil
|
||||
|
||||
x ->
|
||||
|
@ -89,43 +90,44 @@ defmodule Floof do
|
|||
|
||||
backlog
|
||||
|
||||
{:summary, {:Summary, direction, id}} ->
|
||||
{:summary, {:Summary, direction, hashes}} ->
|
||||
case direction do
|
||||
:requestpull ->
|
||||
:ok =
|
||||
send_summary(
|
||||
client,
|
||||
if Floof.Distributor.have_we(id) do
|
||||
:drop
|
||||
else
|
||||
:pull
|
||||
end,
|
||||
id
|
||||
)
|
||||
{to_pull, to_drop} = partition_hashes(hashes)
|
||||
|
||||
if not Enum.empty?(to_pull) do
|
||||
:ok = send_summary(client, :pull, to_pull)
|
||||
end
|
||||
|
||||
if not Enum.empty?(to_pull) do
|
||||
:ok = send_summary(client, :drop, to_drop)
|
||||
end
|
||||
|
||||
backlog
|
||||
|
||||
:pull ->
|
||||
case Map.fetch(backlog, id) do
|
||||
{:ok, item} ->
|
||||
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
|
||||
:ok = :gen_tcp.send(client, encd)
|
||||
for mhash <- hashes do
|
||||
case Map.fetch(backlog, mhash) do
|
||||
{:ok, item} ->
|
||||
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
|
||||
:ok = :gen_tcp.send(client, encd)
|
||||
|
||||
:error ->
|
||||
nil
|
||||
:error ->
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
backlog
|
||||
|
||||
:drop ->
|
||||
Map.delete(backlog, id)
|
||||
Map.drop(backlog, hashes)
|
||||
end
|
||||
|
||||
{:xfer, xf} ->
|
||||
id = Floof.Message.emit(xf)
|
||||
mhash = Floof.Message.emit(xf)
|
||||
# notify the remote end that we successfully consumed the message
|
||||
# non-fatal if this fails
|
||||
send_summary(client, :drop, id)
|
||||
send_summary(client, :drop, [mhash])
|
||||
backlog
|
||||
end
|
||||
end
|
||||
|
@ -135,14 +137,33 @@ defmodule Floof do
|
|||
:empty ->
|
||||
{backlog, added_ids}
|
||||
|
||||
{:value, {id, entry}} ->
|
||||
extract_backlog(key, Map.put(backlog, id, entry), MapSet.put(added_ids, id))
|
||||
{:value, {mhash, entry}} ->
|
||||
extract_backlog(key, Map.put(backlog, mhash, entry), MapSet.put(added_ids, mhash))
|
||||
end
|
||||
end
|
||||
|
||||
defp send_summary(client, direction, dcdid) do
|
||||
dat = {:Summary, direction, dcdid}
|
||||
defp send_summary(client, direction, dcdids) do
|
||||
dat = {:Summary, direction, dcdids}
|
||||
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
|
||||
:gen_tcp.send(client, encd)
|
||||
end
|
||||
|
||||
defp partition_hashes(xs) do
|
||||
xs |> Floof.Distributor.have_we_multi() |> partition_list(MapSet.new(), MapSet.new())
|
||||
end
|
||||
|
||||
defp partition_list([], lo, hi) do
|
||||
{lo, hi}
|
||||
end
|
||||
|
||||
defp partition_list([{key, cnd} | xs], lo, hi) do
|
||||
{lo2, hi2} =
|
||||
if cnd do
|
||||
{lo, MapSet.put(hi, key)}
|
||||
else
|
||||
{MapSet.put(lo, key), hi}
|
||||
end
|
||||
|
||||
partition_list(xs, lo2, hi2)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -31,8 +31,12 @@ defmodule Floof.Distributor do
|
|||
|
||||
# client interface
|
||||
|
||||
def have_we(id) do
|
||||
GenServer.call(__MODULE__, {:haveWe, id})
|
||||
def have_we(mhash) do
|
||||
GenServer.call(__MODULE__, {:haveWe, mhash})
|
||||
end
|
||||
|
||||
def have_we_multi(mhashes) do
|
||||
GenServer.call(__MODULE__, {:haveWeMulti, mhashes})
|
||||
end
|
||||
|
||||
def register(pid) do
|
||||
|
@ -75,7 +79,7 @@ defmodule Floof.Distributor do
|
|||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:xfer, {origin, dcdid, dcd, attrs}}, state) do
|
||||
def handle_cast({:xfer, {origin, dcdhash, dcd, attrs}}, state) do
|
||||
if check_incoming(dstate(state, :fconf), dcd, attrs) do
|
||||
spawn_link(fn ->
|
||||
{:XferBlob, source, ttl, old_markers, signature, data} = dcd
|
||||
|
@ -84,25 +88,31 @@ defmodule Floof.Distributor do
|
|||
dcd2 = {:XferBlob, source, ttl, new_markers, signature, data}
|
||||
|
||||
for trg <- MapSet.delete(dstate(state, :trgs), origin) do
|
||||
send(trg, {:fwdxfer, {dcdid, dcd2}})
|
||||
send(trg, {:fwdxfer, {dcdhash, dcd2}})
|
||||
end
|
||||
|
||||
oset = MapSet.put(MapSet.new(), origin)
|
||||
Floof.SessionManager.push_all({dcdid, dcd2}, oset)
|
||||
Floof.SessionManager.push_all({dcdhash, dcd2}, oset)
|
||||
end)
|
||||
|
||||
Floof.LruCache.put(dstate(state, :seen), dcdid, {})
|
||||
Floof.LruCache.put(dstate(state, :seen), dcdhash, {})
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:haveWe, id}, _, state) do
|
||||
retval = Floof.LruCache.get(dstate(state, :seen), id) != nil
|
||||
def handle_call({:haveWe, mhash}, _, state) do
|
||||
retval = Floof.LruCache.get(dstate(state, :seen), mhash) != nil
|
||||
{:reply, retval, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:haveWeMulti, mhashes}, _, state) do
|
||||
retval = Floof.LruCache.get_multi(dstate(state, :seen), mhashes)
|
||||
{:reply, for({key, x} <- retval, into: %{}, do: {key, x != nil}), state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:EXIT, pid, _}, state) do
|
||||
{:noreply, dstate(state, trgs: MapSet.delete(dstate(state, :trgs), pid))}
|
||||
|
|
|
@ -45,6 +45,10 @@ defmodule Floof.LruCache do
|
|||
GenServer.call(server, {:get, key})
|
||||
end
|
||||
|
||||
def get_multi(server, keys) do
|
||||
GenServer.call(server, {:getMulti, keys})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Removes a kvp from the cache.
|
||||
## Parameters
|
||||
|
@ -87,17 +91,24 @@ defmodule Floof.LruCache do
|
|||
end
|
||||
|
||||
def handle_call({:get, key}, _from, lru_state) do
|
||||
time_result = :ets.lookup(lru_state.position_table, key)
|
||||
{:reply,
|
||||
case :ets.lookup(lru_state.position_table, key) do
|
||||
[{_, time_key}] -> update_item_position(lru_state, key, time_key)
|
||||
[] -> nil
|
||||
end, lru_state}
|
||||
end
|
||||
|
||||
case time_result do
|
||||
[{_, time_key}] ->
|
||||
val = update_item_position(lru_state, key, time_key)
|
||||
{:reply, val, lru_state}
|
||||
def handle_call({:getMulti, keys}, _from, lru_state) do
|
||||
position_table = lru_state.position_table
|
||||
|
||||
# key not found in cache
|
||||
[] ->
|
||||
{:reply, nil, lru_state}
|
||||
end
|
||||
{:reply,
|
||||
Enum.map(keys, fn key ->
|
||||
{key,
|
||||
case :ets.lookup(position_table, key) do
|
||||
[{_, time_key}] -> update_item_position(lru_state, key, time_key)
|
||||
[] -> nil
|
||||
end}
|
||||
end), lru_state}
|
||||
end
|
||||
|
||||
def handle_call({:delete, key}, _from, lru_state) do
|
||||
|
@ -127,9 +138,10 @@ defmodule Floof.LruCache do
|
|||
defp update_item_position(lru_state, key, time_key) do
|
||||
# Puts item in back of table
|
||||
counter = :erlang.unique_integer([:monotonic])
|
||||
[{_, {_key, val}}] = :ets.lookup(lru_state.cache_table, time_key)
|
||||
:ets.delete(lru_state.cache_table, time_key)
|
||||
:ets.insert(lru_state.cache_table, {counter, {key, val}})
|
||||
cache_table = lru_state.cache_table
|
||||
[{_, {_key, val}}] = :ets.lookup(cache_table, time_key)
|
||||
:ets.delete(cache_table, time_key)
|
||||
:ets.insert(cache_table, {counter, {key, val}})
|
||||
:ets.insert(lru_state.position_table, {key, counter})
|
||||
val
|
||||
end
|
||||
|
|
|
@ -5,9 +5,10 @@ defmodule Floof.Message do
|
|||
|
||||
def emit(xf) do
|
||||
{:XferBlob, _, _, _, _, xfinner} = xf
|
||||
{:ok, {:XferInner, id, _, attrs, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
|
||||
GenServer.cast(Floof.Distributor, {:xfer, {self(), id, xf, attrs}})
|
||||
id
|
||||
{:ok, {:XferInner, _, attrs, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
|
||||
mhash = :crypto.hash(:blake2b, xfinner)
|
||||
GenServer.cast(Floof.Distributor, {:xfer, {self(), mhash, xf, attrs}})
|
||||
mhash
|
||||
end
|
||||
|
||||
def sign(xf, secret_key) do
|
||||
|
@ -17,12 +18,11 @@ defmodule Floof.Message do
|
|||
{:XferBlob, source, ttl, markers, signature2, xfinner}
|
||||
end
|
||||
|
||||
def pack_inner(id, topic, attrs, severity, data) do
|
||||
def pack_inner(topic, attrs, severity, data) do
|
||||
# make sure that attrs is ordered to ensure proper repr
|
||||
attrs = :ordsets.to_list(:ordsets.from_list(attrs))
|
||||
|
||||
{:ok, data} =
|
||||
:FloofProtocol.encode(:XferInner, {:XferInner, id, topic, attrs, severity, data})
|
||||
{:ok, data} = :FloofProtocol.encode(:XferInner, {:XferInner, topic, attrs, severity, data})
|
||||
|
||||
data
|
||||
end
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -17,7 +17,7 @@ defmodule Floof.MixProject do
|
|||
# Run "mix help compile.app" to learn about applications.
|
||||
def application do
|
||||
[
|
||||
extra_applications: [:asn1, :logger],
|
||||
extra_applications: [:asn1, :crypto, :logger],
|
||||
mod: {Floof.Application, []},
|
||||
env: [
|
||||
# listen_port cam be set to `nil` to disable to server-socket listener
|
||||
|
|
|
@ -44,9 +44,8 @@ defmodule FloofTest do
|
|||
|
||||
{:ok, secret_key} = File.read("mock_keydb/test01.secret")
|
||||
source = [{:RelativeDistinguishedName, {0, 9, 2342, 19_200_300, 100, 1, 25}, "test"}]
|
||||
msg_id = <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>
|
||||
msg_topic = [{:RelativeDistinguishedName, {2, 5, 4, 41}, "test topic"}]
|
||||
msg_inner = Floof.Message.pack_inner(msg_id, msg_topic, [], :info, <<0, 0>>)
|
||||
msg_inner = Floof.Message.pack_inner(msg_topic, [], :info, <<0, 0>>)
|
||||
ttl = :erlang.convert_time_unit(:erlang.system_time(), :native, :second) + 60
|
||||
msg_outer = Floof.Message.build_outer(source, ttl, [], secret_key, msg_inner)
|
||||
{:ok, msg_encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, msg_outer})
|
||||
|
@ -58,7 +57,7 @@ defmodule FloofTest do
|
|||
:gen_tcp.close(socket)
|
||||
|
||||
receive do
|
||||
{:fwdxfer, {id, _}} -> assert id == msg_id
|
||||
{:fwdxfer, {id, _}} -> assert id == :crypto.hash(:blake2b, msg_inner)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue