allow subscribing to sessions

This commit is contained in:
Alain Zscheile 2022-11-24 01:33:43 +01:00
parent 3671033f6d
commit 2f97c28add
3 changed files with 58 additions and 10 deletions

View file

@ -60,6 +60,10 @@ XferBlob ::= SEQUENCE {
data OCTET STRING
}
SessionModify ::= CHOICE {
attach [0] OCTET STRING
}
SummaryDirection ::= ENUMERATED {
pull (0),
rejectpush (1),
@ -72,8 +76,9 @@ Summary ::= SEQUENCE {
}
ProtoMessage ::= CHOICE {
summary [0] Summary,
xfer [1] XferBlob
session [0] SessionModify,
summary [1] Summary,
xfer [2] XferBlob
}
END

View file

@ -31,8 +31,8 @@ defmodule Floof do
receive do
{:tcp, _, message} ->
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
backlog2 = handle_incoming(client, backlog, dcd)
serve(client, backlog2)
backlog = handle_incoming(client, backlog, dcd)
serve(client, backlog)
{:fwdxfer, dcdmessage} ->
dcdid = get_xfer_id dcdmessage
handle_outgoing(client, dcdid)
@ -42,6 +42,14 @@ defmodule Floof do
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:ok = client.send(encd)
serve(client, backlog)
{:SessionPushed, key} ->
{backlog, added_ids} = extract_backlog(key, backlog, [])
for id <- added_ids do
dat = {:Summary, :requestpull, id}
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:ok = client.send(encd)
end
serve(client, backlog)
x -> Logger.warn("unable to handle request: #{inspect(x)}")
serve(client, backlog)
end
@ -50,10 +58,17 @@ defmodule Floof do
defp handle_incoming(client, backlog, dcd) do
Logger.debug("got packet #{inspect(dcd)}")
case dcd do
{:session, {:SessionModify, {:attach, key}}} ->
if Floof.SessionManager.attach(key, self()) do
send self(), {:SessionPushed, key}
backlog
else
backlog
end
{:summary, {:Summary, direction, id}} ->
case direction do
:requestpull ->
send(Floof.Distributor, {:havewe, self(), id})
send Floof.Distributor, {:havewe, self(), id}
backlog
:pull ->
case List.keytake(backlog, id, 0) do
@ -67,7 +82,7 @@ defmodule Floof do
List.keydelete(backlog, id, 0)
end
{:xfer, xf} ->
send(Floof.Distributor, xf)
send Floof.Distributor, xf
backlog
end
end
@ -82,4 +97,12 @@ defmodule Floof do
{:ok, {:XferInner, id, _, _, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
id
end
defp extract_backlog(key, backlog, added_ids) do
case Floof.SessionManager.pop(key) do
:empty -> {backlog, added_ids}
{:value, {id, entry}} ->
extract_backlog(key, [{id, entry} | backlog], [id | added_ids])
end
end
end

View file

@ -5,11 +5,23 @@ defmodule Floof.SessionManager do
Agent.start_link(fn -> initial_value end, name: __MODULE__)
end
def attach(key, pid) do
Agent.get_and_update(__MODULE__, fn state ->
Map.get_and_update(state, key, fn q ->
{_, q2} = entry_dfl(q)
{:queue.is_empty(q2), {pid, q2}}
end)
end)
end
def push(key, value) do
Agent.cast(__MODULE__, fn state ->
{_, state2} = Map.get_and_update(state, key, fn q ->
q2 = if q == nil do :queue.new() else q end
{nil, :queue.cons(value, q2)} end)
{sub, state2} = Map.get_and_update(state, key, fn q ->
{sub, q2} = entry_dfl(q)
{sub, {sub, :queue.cons(value, q2)}} end)
if sub != nil do
send(sub, {:SessionPushed, key})
end
state2
end)
end
@ -17,7 +29,11 @@ defmodule Floof.SessionManager do
def pop(key) do
Agent.get_and_update(__MODULE__, fn state ->
{tmp, state} = Map.get_and_update(state, key, fn q ->
if q == nil do :pop else :queue.out_r(q) end
if q == nil do :pop else
{sub, q2} = q
{got, q3} = :queue.out_r(q2)
{got, {sub, q3}}
end
end)
retval = case tmp do
nil -> :empty
@ -26,4 +42,8 @@ defmodule Floof.SessionManager do
{retval, state}
end, :infinity)
end
defp entry_dfl(got) do
if got == nil do {nil, :queue.new()} else got end
end
end