increase reliability of sessions
This commit is contained in:
parent
7bbf59625a
commit
3ac8b8e07c
36
lib/floof.ex
36
lib/floof.ex
|
@ -79,8 +79,7 @@ defmodule Floof do
|
|||
end
|
||||
|
||||
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
|
||||
backlog = handle_incoming(client, backlog, sesskey, dcd)
|
||||
{backlog, sesskey}
|
||||
handle_incoming(client, backlog, sesskey, dcd)
|
||||
|
||||
{:fwdxfer, {dcdhash, dcd}} ->
|
||||
backlog =
|
||||
|
@ -99,8 +98,15 @@ defmodule Floof do
|
|||
:ok = send_summary(client, :requestpull, Floof.SessionManager.peek(sesskey))
|
||||
{backlog, sesskey}
|
||||
|
||||
{:SessionDetached, _} ->
|
||||
{backlog, nil}
|
||||
{:SessionDetached, old_sesskey} ->
|
||||
sesskey2 =
|
||||
if sesskey == old_sesskey do
|
||||
nil
|
||||
else
|
||||
sesskey
|
||||
end
|
||||
|
||||
{backlog, sesskey2}
|
||||
|
||||
{:tcp_closed, _} ->
|
||||
# put all the backlogged stuff back if possible
|
||||
|
@ -123,10 +129,14 @@ defmodule Floof do
|
|||
Logger.debug("got packet #{inspect(dcd)}")
|
||||
|
||||
case dcd do
|
||||
{:session, {:attach, sesskey}} ->
|
||||
Floof.SessionManager.attach(sesskey, self())
|
||||
transfer_backlog_to_session(sesskey, backlog)
|
||||
%{}
|
||||
{:session, {:attach, sesskey_new}} ->
|
||||
if sesskey != nil do
|
||||
Floof.SessionManager.detach(sesskey, self())
|
||||
end
|
||||
|
||||
Floof.SessionManager.attach(sesskey_new, self())
|
||||
transfer_backlog_to_session(sesskey_new, backlog)
|
||||
{%{}, sesskey_new}
|
||||
|
||||
{:summary, {:Summary, direction, hashes}} ->
|
||||
case direction do
|
||||
|
@ -134,7 +144,7 @@ defmodule Floof do
|
|||
{to_pull, to_drop} = partition_hashes(hashes)
|
||||
:ok = send_summary(client, :pull, to_pull)
|
||||
:ok = send_summary(client, :drop, to_drop)
|
||||
backlog
|
||||
{backlog, sesskey}
|
||||
|
||||
:pull ->
|
||||
for mhash <- hashes do
|
||||
|
@ -154,14 +164,14 @@ defmodule Floof do
|
|||
end
|
||||
end
|
||||
|
||||
backlog
|
||||
{backlog, sesskey}
|
||||
|
||||
:drop ->
|
||||
if sesskey != nil do
|
||||
Floof.SessionManager.drop(sesskey, hashes)
|
||||
end
|
||||
|
||||
Map.drop(backlog, hashes)
|
||||
{Map.drop(backlog, hashes), sesskey}
|
||||
end
|
||||
|
||||
{:xfer, xf} ->
|
||||
|
@ -169,7 +179,7 @@ defmodule Floof do
|
|||
# notify the remote end that we successfully consumed the message
|
||||
# non-fatal if this fails
|
||||
send_summary(client, :drop, [mhash])
|
||||
backlog
|
||||
{backlog, sesskey}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -185,7 +195,7 @@ defmodule Floof do
|
|||
|
||||
defp transfer_backlog_to_session(sesskey, backlog) do
|
||||
Floof.SessionManager.set_soft_multi(sesskey, for({dcdhash, _} <- backlog, do: dcdhash))
|
||||
Floof.PacketSpool.store_new_multi(backlog)
|
||||
:ok = Floof.PacketSpool.store_new_multi(backlog)
|
||||
end
|
||||
|
||||
defp partition_hashes(xs) do
|
||||
|
|
Loading…
Reference in a new issue