diff --git a/asn1/FloofProtocol.asn1 b/asn1/FloofProtocol.asn1 index eea9b8d..e6a3371 100644 --- a/asn1/FloofProtocol.asn1 +++ b/asn1/FloofProtocol.asn1 @@ -2,10 +2,6 @@ FloofProtocol { 2 25 187636631533261907981725980838781846034 } DEFINITIONS EXPLICIT TAGS ::= BEGIN ActId ::= OCTET STRING (SIZE(16..1024)) -ActIdBunch ::= SEQUENCE(SIZE(1..1024)) OF ActId -Summary ::= SEQUENCE { - ids ActIdBunch -} Signature ::= SEQUENCE { algorithm OBJECT IDENTIFIER, value OCTET STRING @@ -32,9 +28,6 @@ RDNSequence ::= SEQUENCE OF RelativeDistinguishedName XferInner ::= SEQUENCE { id ActId, - -- domain name -- - domain PrintableString (SIZE(1..253)), - -- source system name -- source RDNSequence, @@ -59,7 +52,7 @@ XferBlob ::= SEQUENCE { ttl INTEGER (0..MAX), -- message markers (e.g. review data and such) - markers SEQUENCE OF RelativeDistinguishedName, + markers SET OF RelativeDistinguishedName, -- signature of DER serialization of *data* signature Signature, @@ -67,7 +60,18 @@ XferBlob ::= SEQUENCE { data XferInner } -ProtoPost ::= CHOICE { +SummaryDirection ::= ENUMERATED { + pull (0), + rejectpush (1), + requestpull (2) +} + +Summary ::= SEQUENCE { + direction SummaryDirection, + id ActId +} + +ProtoMessage ::= CHOICE { summary [0] Summary, xfer [1] XferBlob } diff --git a/lib/floof.ex b/lib/floof.ex index 9243f74..a765c8a 100644 --- a/lib/floof.ex +++ b/lib/floof.ex @@ -16,14 +16,69 @@ defmodule Floof do loop_acceptor(socket) end - def loop_acceptor(socket) do + defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child( - Floof.TaskSupervisor, fn -> serve(client) end) + Floof.TaskSupervisor, fn -> serve(client, []) end) :ok = :gen_tcp.controlling_process(client, pid) + # switch to active now that we handed over the client + :inet.setopts(client, [{:active, true}]) + send(Floof.Distributor, {:register, pid}) loop_acceptor(socket) end - def serve(_) do + defp serve(client, backlog) do + receive do + {:tcp, _, message} -> + {:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message) + backlog2 = handle_incoming(client, backlog, dcd) + serve(client, backlog2) + {:fwdxfer, dcdmessage} -> + dcdid = get_xfer_id dcdmessage + handle_outgoing(client, dcdid) + serve(client, [{dcdid, dcdmessage} | backlog]) + {:havewe, id, res} -> + dat = {:Summary, if res do :rejectpush else :pull end, id} + {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat}) + :ok = client.send(encd) + serve(client, backlog) + x -> Logger.warn("unable to handle request: #{inspect(x)}") + serve(client, backlog) + end + end + + defp handle_incoming(client, backlog, dcd) do + Logger.debug("got packet #{inspect(dcd)}") + case dcd do + {:summary, {:Summary, direction, id}} -> + case direction do + :requestpull -> + send(Floof.Distributor, {:havewe, self(), id}) + backlog + :pull -> + case List.keytake(backlog, id, 0) do + {{_, item}, backlog2} -> + {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item}) + :ok = client.send(encd) + backlog2 + nil -> backlog + end + :rejectpush -> + List.keydelete(backlog, id, 0) + end + {:xfer, xf} -> + send(Floof.Distributor, xf) + backlog + end + end + + defp handle_outgoing(client, dcdid) do + dat = {:Summary, :requestpull, dcdid} + {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat}) + :ok = client.send(encd) + end + + def get_xfer_id({:XferBlob, _, _, _, _, {:XferInner, id, _, _, _, _, _}}) do + id end end diff --git a/lib/floof/application.ex b/lib/floof/application.ex index bc72c8c..dd6cad9 100644 --- a/lib/floof/application.ex +++ b/lib/floof/application.ex @@ -4,16 +4,20 @@ defmodule Floof.Application do @moduledoc false use Application + require Floof.Distributor @impl true def start(_type, _args) do port = String.to_integer(System.get_env("FLOOF_PORT") || "2540") + keydb = System.get_env("FLOOF_KEYDB") || "" + # TODO: remaining config settings for Floof.Distributor children = [ # Starts a worker by calling: Floof.Worker.start_link(arg) # {Floof.Worker, arg} {Task.Supervisor, name: Floof.TaskSupervisor}, - {Task, fn -> Floof.accept(port) end} + {Floof.Distributor, Floof.Distributor.fconfig(keydir: keydb)}, + {Task, fn -> Floof.accept(port) end}, ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/floof/distributor.ex b/lib/floof/distributor.ex new file mode 100644 index 0000000..53f8bc9 --- /dev/null +++ b/lib/floof/distributor.ex @@ -0,0 +1,118 @@ +defmodule Floof.Distributor do + @moduledoc """ + (Re-)Distributor for messages + including involved filtering + """ + + use Task, restart: :permanent + + require Logger + require Record + Record.defrecord(:fconfig, markers: [], attrs: [], set_markers: [], keydir: "") + + def start_link(fconf) do + Task.start_link(__MODULE__, :run, [fconf]) + end + + def run(fconf) do + Process.flag(:trap_exit, true) + loop(fconf, [], []) + end + + defp loop(fconf, trgs, seen) do + receive do + {:xfer, dcd} -> + spawn_link(fn -> if check_incoming(fconf, dcd) do + {:XferBlob, priority, ttl, old_markers, signature, data} = dcd + new_markers = Enum.uniq(fconfig(fconf, :set_markers) ++ old_markers) + dcd2 = {:XferBlob, priority, ttl, new_markers, signature, data} + for trg <- trgs, do: send trg, {:fwdxfer, dcd2} + end end) + loop(fconf, trgs, [(Floof.get_xfer_id dcd) | seen]) + + {:havewe, answpid, id} -> + send(answpid, {:havewe, id, Enum.any?(seen, fn x -> id == x end)}) + loop(fconf, trgs, seen) + + {:register, pid} -> + Process.link(pid) + loop(fconf, [pid | trgs], seen) + + {:EXIT, pid, _} -> + loop(fconf, List.delete(trgs, pid), seen) + end + end + + defp check_incoming(fconf, dcd) do + {:XferBlob, priority, ttl, markers, {:Signature, sigalgo, sigval}, xfinner} = dcd + case sigalgo do + {1, 3, 6, 1, 4, 1, 11591, 15, 1} -> + # Ed25519 + {:XferInner, _, source, _, _, _, _} = xfinner + case get_pubkey(fconf, source) do + {:ok, :ed25519, pubkey} -> + {:ok, xfienc} = :FloofProtocol.encode(:XferInner, xfinner) + cond do + not :enacl.sign_verify_detached(sigval, xfienc, pubkey) -> + Logger.error("packet signature invalid for #{inspect(xfinner)}") + false + :erlang.convert_time_unit(:erlang.system_time(), :native, :second) >= ttl -> + Logger.warn("packet in processing expired at #{ttl}: #{inspect(xfinner)}") + false + not check_filters(fconf, {:XferBlob, priority, ttl, markers, {:Signature, sigalgo, sigval}, xfinner}) -> + Logger.debug("packet discarded according to filters: #{inspect(xfinner)}") + false + true -> true + end + _ -> + Logger.warn("unable to verify packet signature due to unknown signer: #{inspect(source)}") + false + end + _ -> + Logger.warn("unable to verify packet signature due to unknown signature algo: #{inspect(sigalgo)}") + false + end + end + + defp check_filters(fconf, {:XferBlob, _, _, markers, _, {:XferInner, _, _, _, attrs, _, _}}) do + cond do + not check_filters_markers2(fconfig(fconf, :markers), markers) -> false + not check_filters_markers2(fconfig(fconf, :attrs ), attrs ) -> false + true -> true + end + end + + defp check_filters_markers1({true, mark}, [cmark | markers]) do + (mark == cmark) || check_filters_markers1({true, mark}, markers) + end + + defp check_filters_markers1({false, mark}, [cmark | markers]) do + (mark != cmark) && check_filters_markers1({false, mark}, markers) + end + + defp check_filters_markers1({xp, _}, []) do + not xp + end + + defp check_filters_markers2([xpm | fconf], markers) do + check_filters_markers1(xpm, markers) && check_filters_markers2(fconf, markers) + end + + defp check_filters_markers2([], _) do + true + end + + defp get_pubkey(fconf, source) do + keydb = fconfig(fconf, :keydir) + if keydb == "" do + :error + else + {:ok, encname} = :FloofProtocol.encode(:RDNSequence, source) + keypath = keydb <> "/" <> Base.url_encode64(encname) + case File.read(keypath) do + {:ok, data} -> {:ok, :ed25519, data} + {:error, _} -> :error + end + end + end +end diff --git a/mix.exs b/mix.exs index 5aef279..12f5d99 100644 --- a/mix.exs +++ b/mix.exs @@ -25,6 +25,7 @@ defmodule Floof.MixProject do defp deps do [ {:asn1ex, git: "https://github.com/vicentfg/asn1ex"}, + {:enacl, "~> 1.0.0"} # {:dep_from_hexpm, "~> 0.3.0"}, # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} ] diff --git a/mix.lock b/mix.lock index db6a19b..bde76cf 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,4 @@ %{ "asn1ex": {:git, "https://github.com/vicentfg/asn1ex", "0255348e2fffbdfd1eef7b46f71dc733318a36a0", []}, - "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "enacl": {:hex, :enacl, "1.0.0", "1bacae6c8aad1a2944ed5eb108e241d0d5e466819b685dfa690650abdff9bba0", [:rebar3], [], "hexpm", "308b38a8cb0d244c97d06b8da73abc318d73f576ab1e7c4999eaf0bbcf993bd2"}, } diff --git a/shell.nix b/shell.nix index 5ceff12..39ae1cd 100644 --- a/shell.nix +++ b/shell.nix @@ -7,5 +7,6 @@ erlpkgs.elixir erlpkgs.hex erlpkgs.rebar3 + pkgs.libsodium ]; }