batch calls to SessionManager and PacketSpool
This commit is contained in:
parent
5abd39c1e3
commit
d8f2dc3e66
20
lib/floof.ex
20
lib/floof.ex
|
@ -86,7 +86,7 @@ defmodule Floof do
|
|||
backlog =
|
||||
if sesskey != nil do
|
||||
# make sure that stuff gets handled uniformly
|
||||
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
||||
Floof.SessionManager.set_soft_multi(sesskey, [dcdhash])
|
||||
Floof.PacketSpool.store_new(dcdhash, dcd)
|
||||
else
|
||||
:ok = send_summary(client, :requestpull, [dcdhash])
|
||||
|
@ -105,11 +105,7 @@ defmodule Floof do
|
|||
{:tcp_closed, _} ->
|
||||
# put all the backlogged stuff back if possible
|
||||
if sesskey != nil do
|
||||
for {dcdhash, dcd} <- backlog do
|
||||
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
||||
Floof.PacketSpool.store_new(dcdhash, dcd)
|
||||
end
|
||||
|
||||
transfer_backlog_to_session(sesskey, backlog)
|
||||
Floof.SessionManager.detach(sesskey, self())
|
||||
end
|
||||
|
||||
|
@ -129,12 +125,7 @@ defmodule Floof do
|
|||
case dcd do
|
||||
{:session, {:attach, sesskey}} ->
|
||||
Floof.SessionManager.attach(sesskey, self())
|
||||
|
||||
for {dcdhash, dcd} <- backlog do
|
||||
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
||||
Floof.PacketSpool.store_new(dcdhash, dcd)
|
||||
end
|
||||
|
||||
transfer_backlog_to_session(sesskey, backlog)
|
||||
%{}
|
||||
|
||||
{:summary, {:Summary, direction, hashes}} ->
|
||||
|
@ -192,6 +183,11 @@ defmodule Floof do
|
|||
end
|
||||
end
|
||||
|
||||
defp transfer_backlog_to_session(sesskey, backlog) do
|
||||
Floof.SessionManager.set_soft_multi(sesskey, for({dcdhash, _} <- backlog, do: dcdhash))
|
||||
Floof.PacketSpool.store_new_multi(backlog)
|
||||
end
|
||||
|
||||
defp partition_hashes(xs) do
|
||||
xs
|
||||
|> Floof.DistributorSeen.have_we_multi()
|
||||
|
|
|
@ -21,10 +21,18 @@ defmodule Floof.PacketSpool do
|
|||
GenServer.call(__MODULE__, {:store, hash, packet, true})
|
||||
end
|
||||
|
||||
def store_multi(packets) do
|
||||
GenServer.call(__MODULE__, {:store, packets, true})
|
||||
end
|
||||
|
||||
def store_new(hash, packet) do
|
||||
GenServer.call(__MODULE__, {:store, hash, packet, false})
|
||||
end
|
||||
|
||||
def store_new_multi(packets) do
|
||||
GenServer.call(__MODULE__, {:store, packets, false})
|
||||
end
|
||||
|
||||
def fetch(hash) do
|
||||
GenServer.call(__MODULE__, {:fetch, hash})
|
||||
end
|
||||
|
@ -62,6 +70,33 @@ defmodule Floof.PacketSpool do
|
|||
)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:store, packets, overwrite}, _, state) do
|
||||
retval =
|
||||
try do
|
||||
packets = for {hash, packet} <- packets, do: {hash_to_path(hash, state), packet}
|
||||
|
||||
# prepare packets list, filter all packets which shouldn't be overwritten
|
||||
packets =
|
||||
if overwrite do
|
||||
packets
|
||||
else
|
||||
Enum.filter(packets, fn {path, _} -> File.exists?(path) end)
|
||||
end
|
||||
|
||||
for {path, packet} <- packets do
|
||||
{:ok, encd} = :FloofProtocol.encode(:XferBlob, packet)
|
||||
:ok = File.write(path, encd)
|
||||
end
|
||||
|
||||
:ok
|
||||
catch
|
||||
y, z -> {:error, y, z}
|
||||
end
|
||||
|
||||
{:reply, retval, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:fetch, hash}, _, state) do
|
||||
echain(
|
||||
|
|
|
@ -88,9 +88,9 @@ defmodule Floof.SessionManager do
|
|||
end)
|
||||
end
|
||||
|
||||
def set_soft(key, subkey) do
|
||||
def set_soft_multi(key, subkeys) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
:ok = :dets.insert(state.dets_file, {key, subkey})
|
||||
:ok = :dets.insert(state.dets_file, for(subkey <- subkeys, do: {key, subkey}))
|
||||
state
|
||||
end)
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue