provide hard backpressure in distribution
This commit is contained in:
parent
c0a210fa6e
commit
eb59c9b9ae
|
@ -58,7 +58,7 @@ defmodule Floof.Distributor do
|
|||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:xfer, {origin, dcdhash, dcd}}, _, state) do
|
||||
def handle_call({:xfer, origin, dcdhash, dcd}, _, state) do
|
||||
# we split the state such that only the parts necessary get
|
||||
# copied into the spawned-off process
|
||||
fconf = dstate(state, :fconf)
|
||||
|
@ -69,34 +69,32 @@ defmodule Floof.Distributor do
|
|||
|
||||
# spawn a separate process to avoid blocking the queue
|
||||
# we only block the queue above for the check + seen marker
|
||||
spawn_link(fn ->
|
||||
{:XferBlob, ttl, old_markers, signature, data} = dcd
|
||||
set_markers = fconfig(fconf, :set_markers)
|
||||
new_markers = Enum.uniq(set_markers ++ old_markers)
|
||||
dcd = {:XferBlob, ttl - 1, new_markers, signature, data}
|
||||
{:XferBlob, ttl, old_markers, signature, data} = dcd
|
||||
set_markers = fconfig(fconf, :set_markers)
|
||||
new_markers = Enum.uniq(set_markers ++ old_markers)
|
||||
dcd = {:XferBlob, ttl - 1, new_markers, signature, data}
|
||||
|
||||
dcd =
|
||||
Enum.reduce(
|
||||
fconfig(fconf, :extra_filters),
|
||||
dcd,
|
||||
fn f, dcdx ->
|
||||
case dcdx do
|
||||
nil -> nil
|
||||
dcdx -> f.(dcdx)
|
||||
end
|
||||
dcd =
|
||||
Enum.reduce(
|
||||
fconfig(fconf, :extra_filters),
|
||||
dcd,
|
||||
fn f, dcdx ->
|
||||
case dcdx do
|
||||
nil -> nil
|
||||
dcdx -> f.(dcdx)
|
||||
end
|
||||
)
|
||||
|
||||
if dcd != nil do
|
||||
for trg <- trgs do
|
||||
send(trg, {:fwdxfer, {dcdhash, dcd}})
|
||||
end
|
||||
)
|
||||
|
||||
if ttl > 1 do
|
||||
Floof.SessionManager.set_for_all(dcdhash, dcd, origin)
|
||||
end
|
||||
if dcd != nil do
|
||||
for trg <- trgs do
|
||||
send(trg, {:fwdxfer, {dcdhash, dcd}})
|
||||
end
|
||||
end)
|
||||
|
||||
if ttl > 1 do
|
||||
Floof.SessionManager.set_for_all(dcdhash, dcd, origin)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
{:reply, :ok, state}
|
||||
|
|
|
@ -6,7 +6,7 @@ defmodule Floof.Message do
|
|||
def emit(xf) do
|
||||
{:XferBlob, _, _, _, xfinner} = xf
|
||||
mhash = :crypto.hash(:blake2b, xfinner)
|
||||
GenServer.call(Floof.Distributor, {:xfer, {self(), mhash, xf}})
|
||||
GenServer.call(Floof.Distributor, {:xfer, self(), mhash, xf}, 10000)
|
||||
mhash
|
||||
end
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ defmodule Floof.SessionManager do
|
|||
end
|
||||
|
||||
def set_for_all(subkey, value, origin) do
|
||||
GenServer.cast(__MODULE__, {:set_for_all, subkey, value, origin})
|
||||
GenServer.call(__MODULE__, {:set_for_all, subkey, value, origin})
|
||||
end
|
||||
|
||||
def set_soft_multi(key, subkeys) do
|
||||
|
@ -78,17 +78,7 @@ defmodule Floof.SessionManager do
|
|||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:modify_subscription, key, handler}, state) do
|
||||
{_, subs2} =
|
||||
Map.get_and_update(state.subs, key, fn oldpid ->
|
||||
{nil, handler.(oldpid, state.dets_file)}
|
||||
end)
|
||||
|
||||
{:noreply, %Floof.SessionManager{state | subs: subs2}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:set_for_all, subkey, value, origin}, state) do
|
||||
def handle_call({:set_for_all, subkey, value, origin}, _, state) do
|
||||
filter_keys =
|
||||
MapSet.new(
|
||||
for {key, sub} <- state.subs do
|
||||
|
@ -115,7 +105,26 @@ defmodule Floof.SessionManager do
|
|||
:ok = :dets.insert(state.dets_file, added_dets_ents)
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:peek, key}, _, state) do
|
||||
{:reply,
|
||||
case :dets.lookup(state.dets_file, key) do
|
||||
{:error, _} -> []
|
||||
items -> Enum.map(items, fn {_, x} -> x end)
|
||||
end, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:modify_subscription, key, handler}, state) do
|
||||
{_, subs2} =
|
||||
Map.get_and_update(state.subs, key, fn oldpid ->
|
||||
{nil, handler.(oldpid, state.dets_file)}
|
||||
end)
|
||||
|
||||
{:noreply, %Floof.SessionManager{state | subs: subs2}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
@ -135,15 +144,6 @@ defmodule Floof.SessionManager do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:peek, key}, _, state) do
|
||||
{:reply,
|
||||
case :dets.lookup(state.dets_file, key) do
|
||||
{:error, _} -> []
|
||||
items -> Enum.map(items, fn {_, x} -> x end)
|
||||
end, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:collect_garbage, state) do
|
||||
case :dets.match(state.dets_file, {'_', '$2'}) do
|
||||
|
|
Loading…
Reference in a new issue