118 lines
3.8 KiB
Elixir
118 lines
3.8 KiB
Elixir
defmodule Floof.Distributor do
|
|
@moduledoc """
|
|
(Re-)Distributor for messages
|
|
including involved filtering
|
|
"""
|
|
|
|
use Task, restart: :permanent
|
|
|
|
require Logger
|
|
require Record
|
|
Record.defrecord(:fconfig, markers: [], attrs: [], set_markers: [], keydir: "")
|
|
|
|
def start_link(fconf) do
|
|
Task.start_link(__MODULE__, :run, [fconf])
|
|
end
|
|
|
|
def run(fconf) do
|
|
Process.flag(:trap_exit, true)
|
|
loop(fconf, [], [])
|
|
end
|
|
|
|
defp loop(fconf, trgs, seen) do
|
|
receive do
|
|
{:xfer, dcd} ->
|
|
spawn_link(fn -> if check_incoming(fconf, dcd) do
|
|
{:XferBlob, priority, ttl, old_markers, signature, data} = dcd
|
|
new_markers = Enum.uniq(fconfig(fconf, :set_markers) ++ old_markers)
|
|
dcd2 = {:XferBlob, priority, ttl, new_markers, signature, data}
|
|
for trg <- trgs, do: send trg, {:fwdxfer, dcd2}
|
|
end end)
|
|
loop(fconf, trgs, [(Floof.get_xfer_id dcd) | seen])
|
|
|
|
{:havewe, answpid, id} ->
|
|
send(answpid, {:havewe, id, Enum.any?(seen, fn x -> id == x end)})
|
|
loop(fconf, trgs, seen)
|
|
|
|
{:register, pid} ->
|
|
Process.link(pid)
|
|
loop(fconf, [pid | trgs], seen)
|
|
|
|
{:EXIT, pid, _} ->
|
|
loop(fconf, List.delete(trgs, pid), seen)
|
|
end
|
|
end
|
|
|
|
defp check_incoming(fconf, dcd) do
|
|
{:XferBlob, priority, ttl, markers, {:Signature, sigalgo, sigval}, xfinner} = dcd
|
|
case sigalgo do
|
|
{1, 3, 6, 1, 4, 1, 11591, 15, 1} ->
|
|
# Ed25519
|
|
{:XferInner, _, source, _, _, _, _} = xfinner
|
|
case get_pubkey(fconf, source) do
|
|
{:ok, :ed25519, pubkey} ->
|
|
{:ok, xfienc} = :FloofProtocol.encode(:XferInner, xfinner)
|
|
cond do
|
|
not :enacl.sign_verify_detached(sigval, xfienc, pubkey) ->
|
|
Logger.error("packet signature invalid for #{inspect(xfinner)}")
|
|
false
|
|
:erlang.convert_time_unit(:erlang.system_time(), :native, :second) >= ttl ->
|
|
Logger.warn("packet in processing expired at #{ttl}: #{inspect(xfinner)}")
|
|
false
|
|
not check_filters(fconf, {:XferBlob, priority, ttl, markers, {:Signature, sigalgo, sigval}, xfinner}) ->
|
|
Logger.debug("packet discarded according to filters: #{inspect(xfinner)}")
|
|
false
|
|
true -> true
|
|
end
|
|
_ ->
|
|
Logger.warn("unable to verify packet signature due to unknown signer: #{inspect(source)}")
|
|
false
|
|
end
|
|
_ ->
|
|
Logger.warn("unable to verify packet signature due to unknown signature algo: #{inspect(sigalgo)}")
|
|
false
|
|
end
|
|
end
|
|
|
|
defp check_filters(fconf, {:XferBlob, _, _, markers, _, {:XferInner, _, _, _, attrs, _, _}}) do
|
|
cond do
|
|
not check_filters_markers2(fconfig(fconf, :markers), markers) -> false
|
|
not check_filters_markers2(fconfig(fconf, :attrs ), attrs ) -> false
|
|
true -> true
|
|
end
|
|
end
|
|
|
|
defp check_filters_markers1({true, mark}, [cmark | markers]) do
|
|
(mark == cmark) || check_filters_markers1({true, mark}, markers)
|
|
end
|
|
|
|
defp check_filters_markers1({false, mark}, [cmark | markers]) do
|
|
(mark != cmark) && check_filters_markers1({false, mark}, markers)
|
|
end
|
|
|
|
defp check_filters_markers1({xp, _}, []) do
|
|
not xp
|
|
end
|
|
|
|
defp check_filters_markers2([xpm | fconf], markers) do
|
|
check_filters_markers1(xpm, markers) && check_filters_markers2(fconf, markers)
|
|
end
|
|
|
|
defp check_filters_markers2([], _) do
|
|
true
|
|
end
|
|
|
|
defp get_pubkey(fconf, source) do
|
|
keydb = fconfig(fconf, :keydir)
|
|
if keydb == "" do
|
|
:error
|
|
else
|
|
{:ok, encname} = :FloofProtocol.encode(:RDNSequence, source)
|
|
keypath = keydb <> "/" <> Base.url_encode64(encname)
|
|
case File.read(keypath) do
|
|
{:ok, data} -> {:ok, :ed25519, data}
|
|
{:error, _} -> :error
|
|
end
|
|
end
|
|
end
|
|
end
|