make garbage collection periodic to avoid bursts
This commit is contained in:
parent
355d559195
commit
93f9852771
|
@ -41,16 +41,15 @@ defmodule Floof.PacketSpool do
|
||||||
GenServer.cast(__MODULE__, {:drop, hash})
|
GenServer.cast(__MODULE__, {:drop, hash})
|
||||||
end
|
end
|
||||||
|
|
||||||
def keep_only(hashes) do
|
|
||||||
# we need backpressure here
|
|
||||||
GenServer.call(__MODULE__, {:keep_only, hashes})
|
|
||||||
end
|
|
||||||
|
|
||||||
### server interface
|
### server interface
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def init(state) do
|
def init(state) do
|
||||||
:ok = File.mkdir_p(state)
|
:ok = File.mkdir_p(state)
|
||||||
|
|
||||||
|
# in 5 seconds
|
||||||
|
Process.send_after(self(), :collect_garbage, 5 * 1000)
|
||||||
|
|
||||||
{:ok, state}
|
{:ok, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -134,41 +133,6 @@ defmodule Floof.PacketSpool do
|
||||||
{:reply, retval, state}
|
{:reply, retval, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
|
||||||
def handle_call({:keep_only, hashes}, _, state) do
|
|
||||||
{:ok, items} = File.ls(state)
|
|
||||||
|
|
||||||
present =
|
|
||||||
MapSet.new(
|
|
||||||
Enum.flat_map(items, fn x ->
|
|
||||||
case Base.url_decode64(x) do
|
|
||||||
{:ok, y} ->
|
|
||||||
[y]
|
|
||||||
|
|
||||||
{:error, e} ->
|
|
||||||
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
|
||||||
[]
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
)
|
|
||||||
|
|
||||||
hashes = MapSet.new(hashes)
|
|
||||||
to_remove = MapSet.difference(present, hashes)
|
|
||||||
|
|
||||||
if not Enum.empty?(to_remove) do
|
|
||||||
Logger.debug("spool: to_remove #{inspect(to_remove)}")
|
|
||||||
|
|
||||||
for hash <- MapSet.difference(present, hashes) do
|
|
||||||
case File.rm(hash_to_path(hash, state)) do
|
|
||||||
:ok -> nil
|
|
||||||
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
{:reply, :ok, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_cast({:drop, hash}, state) do
|
def handle_cast({:drop, hash}, state) do
|
||||||
case File.rm(hash_to_path(hash, state)) do
|
case File.rm(hash_to_path(hash, state)) do
|
||||||
|
@ -179,6 +143,48 @@ defmodule Floof.PacketSpool do
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_info(:collect_garbage, state) do
|
||||||
|
to_keep = Floof.SessionManager.get_keep_only()
|
||||||
|
|
||||||
|
if to_keep != nil do
|
||||||
|
{:ok, items} = File.ls(state)
|
||||||
|
|
||||||
|
present =
|
||||||
|
MapSet.new(
|
||||||
|
Enum.flat_map(items, fn x ->
|
||||||
|
case Base.url_decode64(x) do
|
||||||
|
{:ok, y} ->
|
||||||
|
[y]
|
||||||
|
|
||||||
|
{:error, e} ->
|
||||||
|
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
)
|
||||||
|
|
||||||
|
to_keep = MapSet.new(to_keep)
|
||||||
|
to_remove = MapSet.difference(present, to_keep)
|
||||||
|
|
||||||
|
if not Enum.empty?(to_remove) do
|
||||||
|
Logger.debug("spool: to_remove #{inspect(to_remove)}")
|
||||||
|
|
||||||
|
for hash <- to_remove do
|
||||||
|
case File.rm(hash_to_path(hash, state)) do
|
||||||
|
:ok -> nil
|
||||||
|
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# do this again in 15 seconds
|
||||||
|
Process.send_after(self(), :collect_garbage, 15 * 1000)
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
|
||||||
# internal interface
|
# internal interface
|
||||||
|
|
||||||
defp hash_to_path(hash, state) do
|
defp hash_to_path(hash, state) do
|
||||||
|
|
|
@ -118,19 +118,27 @@ defmodule Floof.SessionManager do
|
||||||
:ok = :dets.delete_object(dets_file, {key, x})
|
:ok = :dets.delete_object(dets_file, {key, x})
|
||||||
end
|
end
|
||||||
|
|
||||||
# garbage collect unused objects
|
|
||||||
case :dets.match(dets_file, {'_', '$1'}) do
|
|
||||||
{:error, e} ->
|
|
||||||
Logger.error("garbage collection error: #{inspect(e)}")
|
|
||||||
|
|
||||||
keys ->
|
|
||||||
Floof.PacketSpool.keep_only(for [key] <- keys, do: key)
|
|
||||||
end
|
|
||||||
|
|
||||||
state
|
state
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def get_keep_only do
|
||||||
|
Agent.get(
|
||||||
|
__MODULE__,
|
||||||
|
fn state ->
|
||||||
|
case :dets.match(state.dets_file, {'_', '$1'}) do
|
||||||
|
{:error, e} ->
|
||||||
|
Logger.error("garbage collection error: #{inspect(e)}")
|
||||||
|
nil
|
||||||
|
|
||||||
|
keys ->
|
||||||
|
for [key] <- keys, do: key
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
:infinity
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
defp all_session_keys(state) do
|
defp all_session_keys(state) do
|
||||||
case :dets.select(state.dets_file, [{{'_', '_'}, [], ['$1']}]) do
|
case :dets.select(state.dets_file, [{{'_', '_'}, [], ['$1']}]) do
|
||||||
{:error, x} -> {:error, x}
|
{:error, x} -> {:error, x}
|
||||||
|
|
Loading…
Reference in a new issue