defmodule Floof.Distributor do @moduledoc """ (Re-)Distributor for messages including involved filtering """ use GenServer require Logger require Record Record.defrecord(:fconfig, markers: [], attrs: [], set_markers: [], extra_filters: [] ) Record.defrecord(:dstate, fconf: {}, trgs: MapSet.new()) def start_link(initval) do GenServer.start_link(__MODULE__, initval, name: __MODULE__) end @impl true def init(fconf) do Process.flag(:trap_exit, true) {:ok, dstate(fconf: fconf)} end # client interface def register(pid) do GenServer.cast(__MODULE__, {:register, pid}) end def unregister(pid) do GenServer.cast(__MODULE__, {:unregister, pid}) end # server interface @impl true def handle_cast({:register, pid}, state) do if :erlang.is_process_alive(pid) do Process.link(pid) {:noreply, dstate(state, trgs: MapSet.put(dstate(state, :trgs), pid))} else {:noreply, state} end end @impl true def handle_cast({:unregister, pid}, state) do trgs = dstate(state, :trgs) Process.unlink(pid) {:noreply, dstate(state, trgs: MapSet.delete(trgs, pid))} end @impl true def handle_cast({:xfer, {origin, dcdhash, dcd}}, state) do # we split the state such that only the parts necessary get # copied into the spawned-off process fconf = dstate(state, :fconf) if check_incoming(fconf, dcdhash, dcd) do trgs = MapSet.delete(dstate(state, :trgs), origin) Floof.DistributorSeen.mark_seen(dcdhash) # spawn a separate process to avoid blocking the queue # we only block the queue above for the check + seen marker spawn_link(fn -> {:XferBlob, ttl, old_markers, signature, data} = dcd set_markers = fconfig(fconf, :set_markers) new_markers = Enum.uniq(set_markers ++ old_markers) dcd = {:XferBlob, ttl - 1, new_markers, signature, data} dcd = Enum.reduce( fconfig(fconf, :extra_filters), dcd, fn f, dcdx -> case dcdx do nil -> nil dcdx -> f.(dcdx) end end ) if dcd != nil do for trg <- trgs do send(trg, {:fwdxfer, {dcdhash, dcd}}) end if ttl > 1 do Floof.SessionManager.set_for_all(dcdhash, dcd, origin) end end end) end {:noreply, state} end @impl true def handle_info({:EXIT, pid, _}, state) do {:noreply, dstate(state, trgs: MapSet.delete(dstate(state, :trgs), pid))} end defp check_incoming(fconf, dcdhash, dcd) do {:XferBlob, ttl, _, {:Signature, sigalgo, pubkey, sigval}, xfinner} = dcd case sigalgo do {1, 3, 6, 1, 4, 1, 11591, 15, 1} -> # Ed25519 cond do Floof.DistributorSeen.have_we(dcdhash) -> # we can ignore this packet, its content was already processed Logger.debug("packet discarded, already seen #{Base.url_encode64(dcdhash)}") false not :enacl.sign_verify_detached(sigval, xfinner, pubkey) -> Logger.error("packet signature invalid for #{inspect(xfinner)}") false ttl == 0 -> Logger.warn("packet in processing expired (out of hops): #{inspect(xfinner)}") false not check_filters(fconf, dcd) -> Logger.debug("packet discarded according to filters: #{inspect(xfinner)}") false true -> true end _ -> Logger.warn( "unable to verify packet signature due to unknown signature algo: #{inspect(sigalgo)}" ) false end end defp check_filters(fconf, {:XferBlob, _, markers, _, xfinner}) do try do {:ok, {:XferInner, _, attrs, _, _}} = :FloofProtocol.decode(:XferInner, xfinner) cond do not check_filters_markers2(fconfig(fconf, :markers), markers) -> false not check_filters_markers2(fconfig(fconf, :attrs), attrs) -> false true -> true end catch _ -> false end end defp check_filters_markers1({true, mark}, [cmark | markers]) do mark == cmark || check_filters_markers1({true, mark}, markers) end defp check_filters_markers1({false, mark}, [cmark | markers]) do mark != cmark && check_filters_markers1({false, mark}, markers) end defp check_filters_markers1({xp, _}, []) do not xp end defp check_filters_markers2([xpm | fconf], markers) do check_filters_markers1(xpm, markers) && check_filters_markers2(fconf, markers) end defp check_filters_markers2([], _) do true end end