From 2f97c28add27dcc6c3d26fd65323f0a0f3f77fe8 Mon Sep 17 00:00:00 2001 From: Alain Zscheile Date: Thu, 24 Nov 2022 01:33:43 +0100 Subject: [PATCH] allow subscribing to sessions --- asn1/FloofProtocol.asn1 | 9 +++++++-- lib/floof.ex | 31 +++++++++++++++++++++++++++---- lib/floof/session_manager.ex | 28 ++++++++++++++++++++++++---- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/asn1/FloofProtocol.asn1 b/asn1/FloofProtocol.asn1 index 9bcfcd3..052546e 100644 --- a/asn1/FloofProtocol.asn1 +++ b/asn1/FloofProtocol.asn1 @@ -60,6 +60,10 @@ XferBlob ::= SEQUENCE { data OCTET STRING } +SessionModify ::= CHOICE { + attach [0] OCTET STRING +} + SummaryDirection ::= ENUMERATED { pull (0), rejectpush (1), @@ -72,8 +76,9 @@ Summary ::= SEQUENCE { } ProtoMessage ::= CHOICE { - summary [0] Summary, - xfer [1] XferBlob + session [0] SessionModify, + summary [1] Summary, + xfer [2] XferBlob } END diff --git a/lib/floof.ex b/lib/floof.ex index f095b0f..870bc1e 100644 --- a/lib/floof.ex +++ b/lib/floof.ex @@ -31,8 +31,8 @@ defmodule Floof do receive do {:tcp, _, message} -> {:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message) - backlog2 = handle_incoming(client, backlog, dcd) - serve(client, backlog2) + backlog = handle_incoming(client, backlog, dcd) + serve(client, backlog) {:fwdxfer, dcdmessage} -> dcdid = get_xfer_id dcdmessage handle_outgoing(client, dcdid) @@ -42,6 +42,14 @@ defmodule Floof do {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat}) :ok = client.send(encd) serve(client, backlog) + {:SessionPushed, key} -> + {backlog, added_ids} = extract_backlog(key, backlog, []) + for id <- added_ids do + dat = {:Summary, :requestpull, id} + {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat}) + :ok = client.send(encd) + end + serve(client, backlog) x -> Logger.warn("unable to handle request: #{inspect(x)}") serve(client, backlog) end @@ -50,10 +58,17 @@ defmodule Floof do defp handle_incoming(client, backlog, dcd) do Logger.debug("got packet #{inspect(dcd)}") case dcd do + {:session, {:SessionModify, {:attach, key}}} -> + if Floof.SessionManager.attach(key, self()) do + send self(), {:SessionPushed, key} + backlog + else + backlog + end {:summary, {:Summary, direction, id}} -> case direction do :requestpull -> - send(Floof.Distributor, {:havewe, self(), id}) + send Floof.Distributor, {:havewe, self(), id} backlog :pull -> case List.keytake(backlog, id, 0) do @@ -67,7 +82,7 @@ defmodule Floof do List.keydelete(backlog, id, 0) end {:xfer, xf} -> - send(Floof.Distributor, xf) + send Floof.Distributor, xf backlog end end @@ -82,4 +97,12 @@ defmodule Floof do {:ok, {:XferInner, id, _, _, _, _}} = :FloofProtocol.decode(:XferInner, xfinner) id end + + defp extract_backlog(key, backlog, added_ids) do + case Floof.SessionManager.pop(key) do + :empty -> {backlog, added_ids} + {:value, {id, entry}} -> + extract_backlog(key, [{id, entry} | backlog], [id | added_ids]) + end + end end diff --git a/lib/floof/session_manager.ex b/lib/floof/session_manager.ex index b7d4852..59302a6 100644 --- a/lib/floof/session_manager.ex +++ b/lib/floof/session_manager.ex @@ -5,11 +5,23 @@ defmodule Floof.SessionManager do Agent.start_link(fn -> initial_value end, name: __MODULE__) end + def attach(key, pid) do + Agent.get_and_update(__MODULE__, fn state -> + Map.get_and_update(state, key, fn q -> + {_, q2} = entry_dfl(q) + {:queue.is_empty(q2), {pid, q2}} + end) + end) + end + def push(key, value) do Agent.cast(__MODULE__, fn state -> - {_, state2} = Map.get_and_update(state, key, fn q -> - q2 = if q == nil do :queue.new() else q end - {nil, :queue.cons(value, q2)} end) + {sub, state2} = Map.get_and_update(state, key, fn q -> + {sub, q2} = entry_dfl(q) + {sub, {sub, :queue.cons(value, q2)}} end) + if sub != nil do + send(sub, {:SessionPushed, key}) + end state2 end) end @@ -17,7 +29,11 @@ defmodule Floof.SessionManager do def pop(key) do Agent.get_and_update(__MODULE__, fn state -> {tmp, state} = Map.get_and_update(state, key, fn q -> - if q == nil do :pop else :queue.out_r(q) end + if q == nil do :pop else + {sub, q2} = q + {got, q3} = :queue.out_r(q2) + {got, {sub, q3}} + end end) retval = case tmp do nil -> :empty @@ -26,4 +42,8 @@ defmodule Floof.SessionManager do {retval, state} end, :infinity) end + + defp entry_dfl(got) do + if got == nil do {nil, :queue.new()} else got end + end end