cache get_xfer_id calls
This commit is contained in:
parent
83ae614581
commit
39c2930b52
20
lib/floof.ex
20
lib/floof.ex
|
@ -33,10 +33,9 @@ defmodule Floof do
|
|||
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
|
||||
backlog = handle_incoming(client, backlog, dcd)
|
||||
serve(client, backlog)
|
||||
{:fwdxfer, dcdmessage} ->
|
||||
dcdid = get_xfer_id dcdmessage
|
||||
{:fwdxfer, {dcdid, dcd}} ->
|
||||
handle_outgoing(client, dcdid)
|
||||
serve(client, [{dcdid, dcdmessage} | backlog])
|
||||
serve(client, [{dcdid, dcd} | backlog])
|
||||
{:havewe, id, res} ->
|
||||
dat = {:Summary, if res do :rejectpush else :pull end, id}
|
||||
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
|
||||
|
@ -61,10 +60,8 @@ defmodule Floof do
|
|||
{:session, {:SessionModify, {:attach, key}}} ->
|
||||
if Floof.SessionManager.attach(key, self()) do
|
||||
send self(), {:SessionPushed, key}
|
||||
backlog
|
||||
else
|
||||
backlog
|
||||
end
|
||||
backlog
|
||||
{:summary, {:Summary, direction, id}} ->
|
||||
case direction do
|
||||
:requestpull ->
|
||||
|
@ -73,7 +70,7 @@ defmodule Floof do
|
|||
:pull ->
|
||||
case List.keytake(backlog, id, 0) do
|
||||
{{_, item}, backlog2} ->
|
||||
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
|
||||
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, {id, item}})
|
||||
:ok = client.send(encd)
|
||||
backlog2
|
||||
nil -> backlog
|
||||
|
@ -82,7 +79,9 @@ defmodule Floof do
|
|||
List.keydelete(backlog, id, 0)
|
||||
end
|
||||
{:xfer, xf} ->
|
||||
send Floof.Distributor, xf
|
||||
{:XferBlob, _, _, _, _, _, xfinner} = xf
|
||||
{:ok, {:XferInner, id, _, _, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
|
||||
send Floof.Distributor, {:xfer, {id, xf}}
|
||||
backlog
|
||||
end
|
||||
end
|
||||
|
@ -93,11 +92,6 @@ defmodule Floof do
|
|||
:ok = client.send(encd)
|
||||
end
|
||||
|
||||
def get_xfer_id({:XferBlob, _, _, _, _, _, xfinner}) do
|
||||
{:ok, {:XferInner, id, _, _, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
|
||||
id
|
||||
end
|
||||
|
||||
defp extract_backlog(key, backlog, added_ids) do
|
||||
case Floof.SessionManager.pop(key) do
|
||||
:empty -> {backlog, added_ids}
|
||||
|
|
|
@ -21,16 +21,16 @@ defmodule Floof.Distributor do
|
|||
|
||||
defp loop(fconf, trgs, keys, seen) do
|
||||
receive do
|
||||
{:xfer, dcd} ->
|
||||
{:xfer, {dcdid, dcd}} ->
|
||||
# this makes sure that the messages don't block each other
|
||||
spawn_link(fn -> if check_incoming(fconf, dcd) do
|
||||
{:XferBlob, source, priority, ttl, old_markers, signature, data} = dcd
|
||||
new_markers = Enum.uniq(fconfig(fconf, :set_markers) ++ old_markers)
|
||||
dcd2 = {:XferBlob, source, priority, ttl, new_markers, signature, data}
|
||||
for trg <- trgs, do: send trg, {:fwdxfer, dcd2}
|
||||
for {key,_} <- keys, do: Floof.SessionManager.push(key, dcd2)
|
||||
for trg <- trgs, do: send trg, {:fwdxfer, {dcdid, dcd2}}
|
||||
for {key,_} <- keys, do: Floof.SessionManager.push(key, {dcdid, dcd2})
|
||||
end end)
|
||||
loop(fconf, trgs, keys, MapSet.put(seen, Floof.get_xfer_id dcd))
|
||||
loop(fconf, trgs, keys, MapSet.put(seen, dcdid))
|
||||
|
||||
{:havewe, answpid, id} ->
|
||||
send(answpid, {:havewe, id, MapSet.member?(seen, id)})
|
||||
|
|
Loading…
Reference in a new issue