put backlogged stuff properly back into session to prevent message leak
This commit is contained in:
parent
3b9aa3f301
commit
2ed3e90018
2 changed files with 35 additions and 9 deletions
24
lib/floof.ex
24
lib/floof.ex
|
@ -27,7 +27,7 @@ defmodule Floof do
|
||||||
|
|
||||||
{:ok, pid} =
|
{:ok, pid} =
|
||||||
Task.Supervisor.start_child(Floof.TaskSupervisor, fn ->
|
Task.Supervisor.start_child(Floof.TaskSupervisor, fn ->
|
||||||
serve(client, %{})
|
serve(client, %{}, nil)
|
||||||
:ok = :gen_tcp.close(client)
|
:ok = :gen_tcp.close(client)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
@ -47,33 +47,39 @@ defmodule Floof do
|
||||||
|
|
||||||
Floof.Distributor.register(self())
|
Floof.Distributor.register(self())
|
||||||
Logger.info("Established connection to #{host}:#{port}")
|
Logger.info("Established connection to #{host}:#{port}")
|
||||||
serve(socket, %{})
|
serve(socket, %{}, nil)
|
||||||
:ok = :gen_tcp.close(socket)
|
:ok = :gen_tcp.close(socket)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp serve(client, backlog) do
|
defp serve(client, backlog, sesskey) do
|
||||||
receive do
|
receive do
|
||||||
{:tcp, _, message} ->
|
{:tcp, _, message} ->
|
||||||
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
|
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
|
||||||
backlog = handle_incoming(client, backlog, dcd)
|
backlog = handle_incoming(client, backlog, dcd)
|
||||||
serve(client, backlog)
|
serve(client, backlog, sesskey)
|
||||||
|
|
||||||
{:fwdxfer, {dcdhash, dcd}} ->
|
{:fwdxfer, {dcdhash, dcd}} ->
|
||||||
:ok = send_summary(client, :requestpull, [dcdhash])
|
:ok = send_summary(client, :requestpull, [dcdhash])
|
||||||
serve(client, Map.put(backlog, dcdhash, dcd))
|
serve(client, Map.put(backlog, dcdhash, dcd), sesskey)
|
||||||
|
|
||||||
{:SessionPushed, key} ->
|
{:SessionPushed, key} ->
|
||||||
{backlog, added_hashs} = extract_backlog(key, backlog, MapSet.new())
|
{backlog, added_hashs} = extract_backlog(key, backlog, MapSet.new())
|
||||||
:ok = send_summary(client, :requestpull, added_hashs)
|
:ok = send_summary(client, :requestpull, added_hashs)
|
||||||
serve(client, backlog)
|
serve(client, backlog, key)
|
||||||
|
|
||||||
{:tcp_closed, _} ->
|
{:tcp_closed, _} ->
|
||||||
# TODO: we need to put all the backlogged stuff back...
|
# put all the backlogged stuff back if possible
|
||||||
nil
|
if sesskey != nil do
|
||||||
|
for item <- backlog do
|
||||||
|
Floof.SessionManager.push(sesskey, item)
|
||||||
|
end
|
||||||
|
|
||||||
|
Floof.SessionManager.deattach(sesskey, self())
|
||||||
|
end
|
||||||
|
|
||||||
x ->
|
x ->
|
||||||
Logger.warn("unable to handle request: #{inspect(x)}")
|
Logger.warn("unable to handle request: #{inspect(x)}")
|
||||||
serve(client, backlog)
|
serve(client, backlog, sesskey)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,26 @@ defmodule Floof.SessionManager do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def deattach(key, pid) do
|
||||||
|
Agent.cast(__MODULE__, fn state ->
|
||||||
|
{_, state2} =
|
||||||
|
Map.get_and_update(state, key, fn q ->
|
||||||
|
{sub, q2} = entry_dfl(q)
|
||||||
|
|
||||||
|
sub2 =
|
||||||
|
if sub == pid do
|
||||||
|
nil
|
||||||
|
else
|
||||||
|
sub
|
||||||
|
end
|
||||||
|
|
||||||
|
{nil, {sub2, q2}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
state2
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
def push(key, value) do
|
def push(key, value) do
|
||||||
Agent.cast(__MODULE__, fn state ->
|
Agent.cast(__MODULE__, fn state ->
|
||||||
{sub, state2} =
|
{sub, state2} =
|
||||||
|
|
Loading…
Reference in a new issue