refactor: SessionManager should use GenServer (prevents hard-to-avoid timeouts)
This commit is contained in:
parent
93f9852771
commit
5c5d2e3355
|
@ -145,38 +145,43 @@ defmodule Floof.PacketSpool do
|
|||
|
||||
@impl true
|
||||
def handle_info(:collect_garbage, state) do
|
||||
to_keep = Floof.SessionManager.get_keep_only()
|
||||
case GenServer.call(Floof.SessionManager, :get_keep_only) do
|
||||
:error ->
|
||||
nil
|
||||
|
||||
if to_keep != nil do
|
||||
{:ok, items} = File.ls(state)
|
||||
{:ok, to_keep} ->
|
||||
{:ok, items} = File.ls(state)
|
||||
|
||||
present =
|
||||
MapSet.new(
|
||||
Enum.flat_map(items, fn x ->
|
||||
case Base.url_decode64(x) do
|
||||
{:ok, y} ->
|
||||
[y]
|
||||
present =
|
||||
MapSet.new(
|
||||
Enum.flat_map(items, fn x ->
|
||||
case Base.url_decode64(x) do
|
||||
{:ok, y} ->
|
||||
[y]
|
||||
|
||||
{:error, e} ->
|
||||
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
||||
[]
|
||||
end
|
||||
end)
|
||||
)
|
||||
|
||||
to_keep = MapSet.new(to_keep)
|
||||
to_remove = MapSet.difference(present, to_keep)
|
||||
|
||||
if not Enum.empty?(to_remove) do
|
||||
Logger.debug("spool: to_remove #{inspect(to_remove)}")
|
||||
|
||||
for hash <- to_remove do
|
||||
case File.rm(hash_to_path(hash, state)) do
|
||||
:ok ->
|
||||
nil
|
||||
|
||||
{:error, e} ->
|
||||
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
||||
[]
|
||||
Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
||||
end
|
||||
end)
|
||||
)
|
||||
|
||||
to_keep = MapSet.new(to_keep)
|
||||
to_remove = MapSet.difference(present, to_keep)
|
||||
|
||||
if not Enum.empty?(to_remove) do
|
||||
Logger.debug("spool: to_remove #{inspect(to_remove)}")
|
||||
|
||||
for hash <- to_remove do
|
||||
case File.rm(hash_to_path(hash, state)) do
|
||||
:ok -> nil
|
||||
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# do this again in 15 seconds
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
defmodule Floof.SessionManager do
|
||||
use Agent
|
||||
use GenServer
|
||||
require Logger
|
||||
require Record
|
||||
|
||||
|
@ -9,140 +9,160 @@ defmodule Floof.SessionManager do
|
|||
defstruct dets_file: :sessions, subs: %{}
|
||||
|
||||
def start_link({file, opts}) do
|
||||
Agent.start_link(
|
||||
fn ->
|
||||
{:ok, file} = :dets.open_file(file, opts ++ [type: :bag])
|
||||
%Floof.SessionManager{dets_file: file}
|
||||
end,
|
||||
name: __MODULE__
|
||||
)
|
||||
GenServer.start_link(__MODULE__, {file, opts}, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({file, opts}) do
|
||||
{:ok, file} = :dets.open_file(file, opts ++ [type: :bag])
|
||||
{:ok, %Floof.SessionManager{dets_file: file}}
|
||||
end
|
||||
|
||||
### client interface
|
||||
|
||||
def attach(key, pid) do
|
||||
modify_subscription(key, fn oldpid, dets_file ->
|
||||
if oldpid != pid do
|
||||
Floof.Distributor.unregister(pid)
|
||||
GenServer.cast(
|
||||
__MODULE__,
|
||||
{:modify_subscription, key,
|
||||
fn oldpid, dets_file ->
|
||||
if oldpid != pid do
|
||||
Floof.Distributor.unregister(pid)
|
||||
|
||||
if oldpid != nil do
|
||||
Floof.Distributor.register(oldpid)
|
||||
send(oldpid, {:SessionDetached, key})
|
||||
end
|
||||
end
|
||||
if oldpid != nil do
|
||||
Floof.Distributor.register(oldpid)
|
||||
send(oldpid, {:SessionDetached, key})
|
||||
end
|
||||
end
|
||||
|
||||
case :dets.lookup(dets_file, key) do
|
||||
[] -> nil
|
||||
{:error, _} -> nil
|
||||
_ -> send(pid, {:SessionPushed, key})
|
||||
end
|
||||
case :dets.lookup(dets_file, key) do
|
||||
[] -> nil
|
||||
{:error, _} -> nil
|
||||
_ -> send(pid, {:SessionPushed, key})
|
||||
end
|
||||
|
||||
pid
|
||||
end)
|
||||
pid
|
||||
end}
|
||||
)
|
||||
end
|
||||
|
||||
def detach(key, pid) do
|
||||
modify_subscription(key, fn oldpid, _ ->
|
||||
if oldpid == pid do
|
||||
send(pid, {:SessionDetached, key})
|
||||
nil
|
||||
else
|
||||
oldpid
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp modify_subscription(key, handler) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
{_, subs2} =
|
||||
Map.get_and_update(state.subs, key, fn oldpid ->
|
||||
{nil, handler.(oldpid, state.dets_file)}
|
||||
end)
|
||||
|
||||
%Floof.SessionManager{state | subs: subs2}
|
||||
end)
|
||||
GenServer.cast(
|
||||
__MODULE__,
|
||||
{:modify_subscription, key,
|
||||
fn oldpid, _ ->
|
||||
if oldpid == pid do
|
||||
send(pid, {:SessionDetached, key})
|
||||
nil
|
||||
else
|
||||
oldpid
|
||||
end
|
||||
end}
|
||||
)
|
||||
end
|
||||
|
||||
def set_for_all(subkey, value, origin) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
filter_keys =
|
||||
MapSet.new(
|
||||
for {key, sub} <- state.subs do
|
||||
if origin != sub and sub != nil do
|
||||
send(sub, {:SessionPushed, key})
|
||||
key
|
||||
else
|
||||
nil
|
||||
end
|
||||
end
|
||||
)
|
||||
|
||||
{:ok, all_keys} = all_session_keys(state)
|
||||
|
||||
# filter_keys contains all subscribed sessions
|
||||
# all_keys contains all non-empty sessions
|
||||
added_dets_ents =
|
||||
for key <- MapSet.union(MapSet.new(all_keys), filter_keys), do: {key, subkey}
|
||||
|
||||
if not Enum.empty?(added_dets_ents) do
|
||||
# only store packets when we have any sessions
|
||||
:ok = Floof.PacketSpool.store(subkey, value)
|
||||
:ok = :dets.insert(state.dets_file, added_dets_ents)
|
||||
end
|
||||
|
||||
state
|
||||
end)
|
||||
GenServer.cast(__MODULE__, {:set_for_all, subkey, value, origin})
|
||||
end
|
||||
|
||||
def set_soft_multi(key, subkeys) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
:ok = :dets.insert(state.dets_file, for(subkey <- subkeys, do: {key, subkey}))
|
||||
state
|
||||
end)
|
||||
GenServer.cast(__MODULE__, {:set_soft_multi, key, subkeys})
|
||||
end
|
||||
|
||||
def peek(key) do
|
||||
Agent.get(
|
||||
__MODULE__,
|
||||
&case :dets.lookup(&1.dets_file, key) do
|
||||
{:error, _} -> []
|
||||
items -> Enum.map(items, fn {_, x} -> x end)
|
||||
end,
|
||||
:infinity
|
||||
)
|
||||
GenServer.call(__MODULE__, {:peek, key})
|
||||
end
|
||||
|
||||
def drop(key, subkeys) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
dets_file = state.dets_file
|
||||
|
||||
for x <- subkeys do
|
||||
:ok = :dets.delete_object(dets_file, {key, x})
|
||||
end
|
||||
|
||||
state
|
||||
end)
|
||||
GenServer.cast(__MODULE__, {:drop, key, subkeys})
|
||||
end
|
||||
|
||||
def get_keep_only do
|
||||
Agent.get(
|
||||
__MODULE__,
|
||||
fn state ->
|
||||
case :dets.match(state.dets_file, {'_', '$1'}) do
|
||||
{:error, e} ->
|
||||
Logger.error("garbage collection error: #{inspect(e)}")
|
||||
nil
|
||||
@impl true
|
||||
def handle_cast({:modify_subscription, key, handler}, state) do
|
||||
{_, subs2} =
|
||||
Map.get_and_update(state.subs, key, fn oldpid ->
|
||||
{nil, handler.(oldpid, state.dets_file)}
|
||||
end)
|
||||
|
||||
keys ->
|
||||
for [key] <- keys, do: key
|
||||
{:noreply, %Floof.SessionManager{state | subs: subs2}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:set_for_all, subkey, value, origin}, state) do
|
||||
filter_keys =
|
||||
MapSet.new(
|
||||
for {key, sub} <- state.subs do
|
||||
if origin != sub and sub != nil do
|
||||
send(sub, {:SessionPushed, key})
|
||||
key
|
||||
else
|
||||
nil
|
||||
end
|
||||
end
|
||||
end,
|
||||
:infinity
|
||||
)
|
||||
)
|
||||
|
||||
{:ok, all_keys} = all_session_keys(state)
|
||||
|
||||
# filter_keys contains all subscribed sessions
|
||||
# all_keys contains all non-empty sessions
|
||||
added_dets_ents =
|
||||
for key <- MapSet.union(MapSet.new(all_keys), filter_keys), do: {key, subkey}
|
||||
|
||||
if not Enum.empty?(added_dets_ents) do
|
||||
# only store packets when we have any sessions
|
||||
:ok = Floof.PacketSpool.store(subkey, value)
|
||||
:ok = :dets.insert(state.dets_file, added_dets_ents)
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:set_soft_multi, key, subkeys}, state) do
|
||||
:ok = :dets.insert(state.dets_file, for(subkey <- subkeys, do: {key, subkey}))
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:drop, key, subkeys}, state) do
|
||||
dets_file = state.dets_file
|
||||
|
||||
for x <- subkeys do
|
||||
:ok = :dets.delete_object(dets_file, {key, x})
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:peek, key}, _, state) do
|
||||
{:reply,
|
||||
case :dets.lookup(state.dets_file, key) do
|
||||
{:error, _} -> []
|
||||
items -> Enum.map(items, fn {_, x} -> x end)
|
||||
end, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_keep_only, _, state) do
|
||||
{:reply,
|
||||
case :dets.match(state.dets_file, {'_', '$2'}) do
|
||||
{:error, e} ->
|
||||
Logger.error("garbage collection error: #{inspect(e)}")
|
||||
:error
|
||||
|
||||
keys ->
|
||||
{:ok, Enum.map(keys, fn [x] -> x end)}
|
||||
end, state}
|
||||
end
|
||||
|
||||
defp all_session_keys(state) do
|
||||
case :dets.select(state.dets_file, [{{'_', '_'}, [], ['$1']}]) do
|
||||
{:error, x} -> {:error, x}
|
||||
items -> {:ok, Enum.map(items, fn {x} -> x end)}
|
||||
case :dets.match(state.dets_file, {'_', '$1'}) do
|
||||
{:error, e} ->
|
||||
Logger.error("unable to fetch list of session keys: #{inspect(e)}")
|
||||
{:error, e}
|
||||
|
||||
keys ->
|
||||
{:ok, Enum.map(keys, fn [x] -> x end)}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue