distrs/sessmgr: properly integrate sessions

This commit is contained in:
Alain Zscheile 2022-11-24 02:00:36 +01:00
parent 2f97c28add
commit f0d84d7817

View file

@ -16,10 +16,10 @@ defmodule Floof.Distributor do
def run(fconf) do
Process.flag(:trap_exit, true)
loop(fconf, [], [])
loop(fconf, MapSet.new(), %{}, MapSet.new())
end
defp loop(fconf, trgs, seen) do
defp loop(fconf, trgs, keys, seen) do
receive do
{:xfer, dcd} ->
# this makes sure that the messages don't block each other
@ -28,19 +28,28 @@ defmodule Floof.Distributor do
new_markers = Enum.uniq(fconfig(fconf, :set_markers) ++ old_markers)
dcd2 = {:XferBlob, source, priority, ttl, new_markers, signature, data}
for trg <- trgs, do: send trg, {:fwdxfer, dcd2}
for {key,_} <- keys, do: Floof.SessionManager.push(key, dcd2)
end end)
loop(fconf, trgs, [(Floof.get_xfer_id dcd) | seen])
loop(fconf, trgs, keys, MapSet.put(seen, Floof.get_xfer_id dcd))
{:havewe, answpid, id} ->
send(answpid, {:havewe, id, Enum.any?(seen, fn x -> id == x end)})
loop(fconf, trgs, seen)
loop(fconf, trgs, keys, seen)
{:register, pid} ->
Process.link(pid)
loop(fconf, [pid | trgs], seen)
loop(fconf, MapSet.put(trgs, pid), keys, seen)
{:registerKey, key, pid} ->
trgs = case Map.get(keys, key) do
nil -> trgs
oldpid -> MapSet.put(trgs, oldpid)
end
Process.link(pid)
loop(fconf, MapSet.delete(trgs, pid), Map.put(keys, key, pid), seen)
{:EXIT, pid, _} ->
loop(fconf, List.delete(trgs, pid), seen)
loop(fconf, MapSet.delete(trgs, pid), keys, seen)
end
end