floof/lib/floof/packet_spool.ex
2022-11-28 00:37:45 +01:00

205 lines
4.7 KiB
Elixir

defmodule Floof.PacketSpool do
@moduledoc """
disk-backed packet spooling management
saves packets to disk, and manages encoding/decoding
the user is encouraged to use a backing disk
with long commit times to improve performance and avoid disk churn
"""
use GenServer
require Logger
def start_link(initval \\ "/var/spool/floof") do
GenServer.start_link(__MODULE__, initval, name: __MODULE__)
end
### client interface
## packet is expected to be of type :XferBlob
def store(hash, packet) do
GenServer.call(__MODULE__, {:store, hash, packet, true})
end
def store_multi(packets) do
GenServer.call(__MODULE__, {:store, packets, true})
end
def store_new(hash, packet) do
GenServer.call(__MODULE__, {:store, hash, packet, false})
end
def store_new_multi(packets) do
GenServer.call(__MODULE__, {:store, packets, false})
end
def fetch(hash) do
GenServer.call(__MODULE__, {:fetch, hash})
end
def drop(hash) do
GenServer.cast(__MODULE__, {:drop, hash})
end
### server interface
@impl true
def init(state) do
:ok = File.mkdir_p(state)
{:ok, state}
end
@impl true
def handle_call({:store, hash, packet, overwrite}, _, state) do
path = hash_to_path(hash, state)
echain(
state,
fn ->
if not overwrite and File.exists?(path) do
:ok
else
:FloofProtocol.encode(:XferBlob, packet)
end
end,
&File.write(path, &1)
)
end
@impl true
def handle_call({:store, packets, overwrite}, _, state) do
retval =
try do
packets = for {hash, packet} <- packets, do: {hash_to_path(hash, state), packet}
# prepare packets list, filter all packets which shouldn't be overwritten
packets =
if overwrite do
packets
else
Enum.filter(packets, fn {path, _} -> File.exists?(path) end)
end
for {path, packet} <- packets do
{:ok, encd} = :FloofProtocol.encode(:XferBlob, packet)
:ok = File.write(path, encd)
end
:ok
catch
y, z -> {:error, y, z}
end
{:reply, retval, state}
end
@impl true
def handle_call({:fetch, hash}, _, state) do
retval =
try do
path = hash_to_path(hash, state)
case File.read(path) do
{:ok, encd} ->
case :FloofProtocol.decode(:XferBlob, encd) do
{:ok, data} ->
{:ok, data}
{:error, e} ->
Logger.error(
"spool: invalid content in object #{inspect(hash)}, not decodable: #{inspect(e)} -> deleting"
)
case File.rm(path) do
:ok ->
nil
{:error, e} ->
Logger.error("spool: unable to delete object #{inspect(hash)}: #{inspect(e)}")
end
end
x ->
x
end
catch
y, z -> {:error, y, z}
end
{:reply, retval, state}
end
@impl true
def handle_cast({:collect_garbage_keep_only, to_keep}, state) do
{:ok, items} = File.ls(state)
present =
MapSet.new(
Enum.flat_map(items, fn x ->
case Base.url_decode64(x) do
{:ok, y} ->
[y]
{:error, e} ->
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
[]
end
end)
)
to_keep = MapSet.new(to_keep)
to_remove = MapSet.difference(present, to_keep)
if not Enum.empty?(to_remove) do
Logger.debug(
"spool: removing #{MapSet.size(to_remove)} / keeping #{MapSet.size(to_keep)} / available #{MapSet.size(present)} entries"
)
for hash <- to_remove do
case File.rm(hash_to_path(hash, state)) do
:ok ->
nil
{:error, e} ->
Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
end
end
end
# this might seem counter-intuitively, but it prevents unnecessary timeouts
Process.send_after(Floof.SessionManager, :collect_garbage, 15 * 1000)
{:noreply, state}
end
@impl true
def handle_cast({:drop, hash}, state) do
case File.rm(hash_to_path(hash, state)) do
:ok -> nil
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
end
{:noreply, state}
end
# internal interface
defp hash_to_path(hash, state) do
state <> "/" <> Base.url_encode64(hash)
end
defp echain(state, handler1, handler2) do
retval =
try do
case handler1.() do
{:ok, encd} -> handler2.(encd)
x -> x
end
catch
y, z -> {:error, y, z}
end
{:reply, retval, state}
end
end