defmodule Floof.PacketSpool do @moduledoc """ disk-backed packet spooling management saves packets to disk, and manages encoding/decoding the user is encouraged to use a backing disk with long commit times to improve performance and avoid disk churn """ use GenServer require Logger def start_link(initval \\ "/var/spool/floof") do GenServer.start_link(__MODULE__, initval, name: __MODULE__) end ### client interface ## packet is expected to be of type :XferBlob def store(hash, packet) do GenServer.call(__MODULE__, {:store, hash, packet, true}, :infinity) end def store_multi(packets) do GenServer.call(__MODULE__, {:store, packets, true}, :infinity) end def store_new(hash, packet) do GenServer.call(__MODULE__, {:store, hash, packet, false}, :infinity) end def store_new_multi(packets) do GenServer.call(__MODULE__, {:store, packets, false}, :infinity) end def fetch(hash) do GenServer.call(__MODULE__, {:fetch, hash}, :infinity) end def drop(hash) do GenServer.cast(__MODULE__, {:drop, hash}) end ### server interface @impl true def init(state) do :ok = File.mkdir_p(state) {:ok, state} end @impl true def handle_call({:store, hash, packet, overwrite}, _, state) do path = hash_to_path(hash, state) echain( state, fn -> if not overwrite and File.exists?(path) do :ok else :FloofProtocol.encode(:XferBlob, packet) end end, &File.write(path, &1) ) end @impl true def handle_call({:store, packets, overwrite}, _, state) do retval = try do packets = for {hash, packet} <- packets, do: {hash_to_path(hash, state), packet} # prepare packets list, filter all packets which shouldn't be overwritten packets = if overwrite do packets else Enum.filter(packets, fn {path, _} -> File.exists?(path) end) end for {path, packet} <- packets do {:ok, encd} = :FloofProtocol.encode(:XferBlob, packet) :ok = File.write(path, encd) end :ok catch y, z -> {:error, y, z} end {:reply, retval, state} end @impl true def handle_call({:fetch, hash}, _, state) do retval = try do path = hash_to_path(hash, state) case File.read(path) do {:ok, encd} -> case :FloofProtocol.decode(:XferBlob, encd) do {:ok, data} -> {:ok, data} {:error, e} -> Logger.error( "spool: invalid content in object #{inspect(hash)}, not decodable: #{inspect(e)} -> deleting" ) case File.rm(path) do :ok -> nil {:error, e} -> Logger.error("spool: unable to delete object #{inspect(hash)}: #{inspect(e)}") end end x -> x end catch y, z -> {:error, y, z} end {:reply, retval, state} end @impl true def handle_call({:collect_garbage_keep_only, to_keep}, _, state) do {:ok, items} = File.ls(state) present = MapSet.new( Enum.flat_map(items, fn x -> case Base.url_decode64(x) do {:ok, y} -> [y] {:error, e} -> Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}") [] end end) ) to_keep = MapSet.new(to_keep) to_remove = MapSet.difference(present, to_keep) if not Enum.empty?(to_remove) do Logger.debug("spool: to_remove #{inspect(to_remove)}") for hash <- to_remove do case File.rm(hash_to_path(hash, state)) do :ok -> nil {:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}") end end end {:noreply, state} end @impl true def handle_cast({:drop, hash}, state) do case File.rm(hash_to_path(hash, state)) do :ok -> nil {:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}") end {:noreply, state} end # internal interface defp hash_to_path(hash, state) do state <> "/" <> Base.url_encode64(hash) end defp echain(state, handler1, handler2) do retval = try do case handler1.() do {:ok, encd} -> handler2.(encd) x -> x end catch y, z -> {:error, y, z} end {:reply, retval, state} end end