floof/lib/floof/distributor.ex
Alain Zscheile c48367ff09 send public key with every message
this is more efficient and solves key distribution concerns
2022-11-26 23:41:56 +01:00

163 lines
4.2 KiB
Elixir

defmodule Floof.Distributor do
@moduledoc """
(Re-)Distributor for messages
including involved filtering
"""
use GenServer
require Logger
require Record
Record.defrecord(:fconfig,
markers: [],
attrs: [],
set_markers: []
)
Record.defrecord(:dstate, fconf: {}, trgs: MapSet.new())
def start_link(initval) do
GenServer.start_link(__MODULE__, initval, name: __MODULE__)
end
@impl true
def init(fconf) do
Process.flag(:trap_exit, true)
{:ok, dstate(fconf: fconf)}
end
# client interface
def register(pid) do
GenServer.cast(__MODULE__, {:register, pid})
end
def unregister(pid) do
GenServer.cast(__MODULE__, {:unregister, pid})
end
# server interface
@impl true
def handle_cast({:register, pid}, state) do
if :erlang.is_process_alive(pid) do
Process.link(pid)
{:noreply, dstate(state, trgs: MapSet.put(dstate(state, :trgs), pid))}
else
{:noreply, state}
end
end
@impl true
def handle_cast({:unregister, pid}, state) do
trgs = dstate(state, :trgs)
Process.unlink(pid)
{:noreply, dstate(state, trgs: MapSet.delete(trgs, pid))}
end
@impl true
def handle_cast({:xfer, {origin, dcdhash, dcd}}, state) do
# we split the state such that only the parts necessary get
# copied into the spawned-off process
fconf = dstate(state, :fconf)
if check_incoming(fconf, dcdhash, dcd) do
trgs = MapSet.delete(dstate(state, :trgs), origin)
Floof.DistributorSeen.mark_seen(dcdhash)
# spawn a separate process to avoid blocking the queue
# we only block the queue above for the check + seen marker
spawn_link(fn ->
{:XferBlob, ttl, old_markers, signature, data} = dcd
set_markers = fconfig(fconf, :set_markers)
new_markers = Enum.uniq(set_markers ++ old_markers)
dcd2 = {:XferBlob, ttl - 1, new_markers, signature, data}
for trg <- trgs do
send(trg, {:fwdxfer, {dcdhash, dcd2}})
end
Floof.SessionManager.set_for_all(dcdhash, dcd2, origin)
end)
end
{:noreply, state}
end
@impl true
def handle_info({:EXIT, pid, _}, state) do
{:noreply, dstate(state, trgs: MapSet.delete(dstate(state, :trgs), pid))}
end
defp check_incoming(fconf, dcdhash, dcd) do
{:XferBlob, ttl, _, {:Signature, sigalgo, pubkey, sigval}, xfinner} = dcd
case sigalgo do
{1, 3, 6, 1, 4, 1, 11591, 15, 1} ->
# Ed25519
cond do
Floof.DistributorSeen.have_we(dcdhash) ->
# we can ignore this packet, its content was already processed
false
not :enacl.sign_verify_detached(sigval, xfinner, pubkey) ->
Logger.error("packet signature invalid for #{inspect(xfinner)}")
false
ttl == 0 ->
Logger.warn("packet in processing expired (out of hops): #{inspect(xfinner)}")
false
not check_filters(fconf, dcd) ->
Logger.debug("packet discarded according to filters: #{inspect(xfinner)}")
false
true ->
true
end
_ ->
Logger.warn(
"unable to verify packet signature due to unknown signature algo: #{inspect(sigalgo)}"
)
false
end
end
defp check_filters(fconf, {:XferBlob, _, markers, _, xfinner}) do
try do
{:ok, {:XferInner, _, attrs, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
cond do
not check_filters_markers2(fconfig(fconf, :markers), markers) -> false
not check_filters_markers2(fconfig(fconf, :attrs), attrs) -> false
true -> true
end
catch
_ -> false
end
end
defp check_filters_markers1({true, mark}, [cmark | markers]) do
mark == cmark || check_filters_markers1({true, mark}, markers)
end
defp check_filters_markers1({false, mark}, [cmark | markers]) do
mark != cmark && check_filters_markers1({false, mark}, markers)
end
defp check_filters_markers1({xp, _}, []) do
not xp
end
defp check_filters_markers2([xpm | fconf], markers) do
check_filters_markers1(xpm, markers) && check_filters_markers2(fconf, markers)
end
defp check_filters_markers2([], _) do
true
end
end