floof/lib/floof/distributor.ex
2022-11-26 02:29:26 +01:00

176 lines
4.7 KiB
Elixir

defmodule Floof.Distributor do
@moduledoc """
(Re-)Distributor for messages
including involved filtering
"""
use GenServer
require Logger
require Record
Record.defrecord(:fconfig,
markers: [],
attrs: [],
set_markers: [],
pubkey_mgr: Floof.Distributor.PubkeyMgr.FileDB,
pubkey_config: %{:keydb => ""}
)
Record.defrecord(:dstate, fconf: {}, trgs: MapSet.new())
def start_link(initval) do
GenServer.start_link(__MODULE__, initval, name: __MODULE__)
end
@impl true
def init(fconf) do
Process.flag(:trap_exit, true)
{:ok, dstate(fconf: fconf)}
end
# client interface
def register(pid) do
GenServer.cast(__MODULE__, {:register, pid})
end
def unregister(pid) do
GenServer.cast(__MODULE__, {:unregister, pid})
end
# server interface
@impl true
def handle_cast({:register, pid}, state) do
if :erlang.is_process_alive(pid) do
Process.link(pid)
{:noreply, dstate(state, trgs: MapSet.put(dstate(state, :trgs), pid))}
else
{:noreply, state}
end
end
@impl true
def handle_cast({:unregister, pid}, state) do
trgs = dstate(state, :trgs)
Process.unlink(pid)
{:noreply, dstate(state, trgs: MapSet.delete(trgs, pid))}
end
@impl true
def handle_cast({:xfer, {origin, dcdhash, dcd, attrs}}, state) do
# we split the state such that only the parts necessary get
# copied into the spawned-off process
fconf = dstate(state, :fconf)
if check_incoming(fconf, dcdhash, dcd, attrs) do
trgs = MapSet.delete(dstate(state, :trgs), origin)
Floof.DistributorSeen.mark_seen(dcdhash)
# spawn a separate process to avoid blocking the queue
# we only block the queue above for the check + seen marker
spawn_link(fn ->
{:XferBlob, source, ttl, old_markers, signature, data} = dcd
set_markers = fconfig(fconf, :set_markers)
new_markers = Enum.uniq(set_markers ++ old_markers)
dcd2 = {:XferBlob, source, ttl, new_markers, signature, data}
for trg <- trgs do
send(trg, {:fwdxfer, {dcdhash, dcd2}})
end
Floof.SessionManager.set_for_all(dcdhash, dcd2, origin)
end)
end
{:noreply, state}
end
@impl true
def handle_info({:EXIT, pid, _}, state) do
{:noreply, dstate(state, trgs: MapSet.delete(dstate(state, :trgs), pid))}
end
defp check_incoming(fconf, dcdhash, dcd, attrs) do
{:XferBlob, source, ttl, _, {:Signature, sigalgo, sigval}, xfinner} = dcd
case sigalgo do
{1, 3, 6, 1, 4, 1, 11591, 15, 1} ->
# Ed25519
case get_pubkey(fconf, source) do
{:ok, :ed25519, pubkey} ->
cond do
Floof.DistributorSeen.have_we(dcdhash) ->
# we can ignore this packet, its content was already processed
false
not :enacl.sign_verify_detached(sigval, xfinner, pubkey) ->
Logger.error("packet signature invalid for #{inspect(xfinner)}")
false
:erlang.convert_time_unit(:erlang.system_time(), :native, :second) >= ttl ->
Logger.warn("packet in processing expired at #{ttl}: #{inspect(xfinner)}")
false
not check_filters(fconf, dcd, attrs) ->
Logger.debug("packet discarded according to filters: #{inspect(xfinner)}")
false
true ->
true
end
_ ->
Logger.warn(
"unable to verify packet signature due to unknown signer: #{inspect(source)}"
)
false
end
_ ->
Logger.warn(
"unable to verify packet signature due to unknown signature algo: #{inspect(sigalgo)}"
)
false
end
end
defp check_filters(fconf, {:XferBlob, _, _, markers, _, _}, attrs) do
try do
cond do
not check_filters_markers2(fconfig(fconf, :markers), markers) -> false
not check_filters_markers2(fconfig(fconf, :attrs), attrs) -> false
true -> true
end
catch
_ -> false
end
end
defp check_filters_markers1({true, mark}, [cmark | markers]) do
mark == cmark || check_filters_markers1({true, mark}, markers)
end
defp check_filters_markers1({false, mark}, [cmark | markers]) do
mark != cmark && check_filters_markers1({false, mark}, markers)
end
defp check_filters_markers1({xp, _}, []) do
not xp
end
defp check_filters_markers2([xpm | fconf], markers) do
check_filters_markers1(xpm, markers) && check_filters_markers2(fconf, markers)
end
defp check_filters_markers2([], _) do
true
end
defp get_pubkey(fconf, source) do
fconfig(fconf, :pubkey_mgr).get_pubkey(fconfig(fconf, :pubkey_config), source)
end
end