refactor(PacketSpool): :keep_only should be a call, make :fetch more resilient
This commit is contained in:
parent
85e31bbca0
commit
c74d1b061c
|
@ -42,7 +42,8 @@ defmodule Floof.PacketSpool do
|
|||
end
|
||||
|
||||
def keep_only(hashes) do
|
||||
GenServer.cast(__MODULE__, {:keep_only, hashes})
|
||||
# we need backpressure here
|
||||
GenServer.call(__MODULE__, {:keep_only, hashes})
|
||||
end
|
||||
|
||||
### server interface
|
||||
|
@ -99,11 +100,73 @@ defmodule Floof.PacketSpool do
|
|||
|
||||
@impl true
|
||||
def handle_call({:fetch, hash}, _, state) do
|
||||
echain(
|
||||
state,
|
||||
fn -> File.read(hash_to_path(hash, state)) end,
|
||||
&:FloofProtocol.decode(:XferBlob, &1)
|
||||
)
|
||||
retval =
|
||||
try do
|
||||
path = hash_to_path(hash, state)
|
||||
|
||||
case File.read(path) do
|
||||
{:ok, encd} ->
|
||||
case :FloofProtocol.decode(:XferBlob, encd) do
|
||||
{:ok, data} ->
|
||||
{:ok, data}
|
||||
|
||||
{:error, e} ->
|
||||
Logger.error(
|
||||
"spool: invalid content in object #{inspect(hash)}, not decodable: #{inspect(e)} -> deleting"
|
||||
)
|
||||
|
||||
case File.rm(path) do
|
||||
:ok ->
|
||||
nil
|
||||
|
||||
{:error, e} ->
|
||||
Logger.error("spool: unable to delete object #{inspect(hash)}: #{inspect(e)}")
|
||||
end
|
||||
end
|
||||
|
||||
x ->
|
||||
x
|
||||
end
|
||||
catch
|
||||
y, z -> {:error, y, z}
|
||||
end
|
||||
|
||||
{:reply, retval, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:keep_only, hashes}, _, state) do
|
||||
{:ok, items} = File.ls(state)
|
||||
|
||||
present =
|
||||
MapSet.new(
|
||||
Enum.flat_map(items, fn x ->
|
||||
case Base.url_decode64(x) do
|
||||
{:ok, y} ->
|
||||
[y]
|
||||
|
||||
{:error, e} ->
|
||||
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
||||
[]
|
||||
end
|
||||
end)
|
||||
)
|
||||
|
||||
hashes = MapSet.new(hashes)
|
||||
to_remove = MapSet.difference(present, hashes)
|
||||
|
||||
if not Enum.empty?(to_remove) do
|
||||
Logger.debug("spool: to_remove #{inspect(to_remove)}")
|
||||
|
||||
for hash <- MapSet.difference(present, hashes) do
|
||||
case File.rm(hash_to_path(hash, state)) do
|
||||
:ok -> nil
|
||||
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
@ -116,43 +179,6 @@ defmodule Floof.PacketSpool do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:keep_only, hashes}, state) do
|
||||
case File.ls(state) do
|
||||
{:error, e} ->
|
||||
Logger.error(
|
||||
"spool: unable to browse spool directory for garbage collection: #{inspect(e)}"
|
||||
)
|
||||
|
||||
{:ok, items} ->
|
||||
present =
|
||||
MapSet.new(
|
||||
Enum.flat_map(items, fn x ->
|
||||
case Base.url_decode64(x) do
|
||||
{:ok, y} ->
|
||||
[y]
|
||||
|
||||
{:error, e} ->
|
||||
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
||||
[]
|
||||
end
|
||||
end)
|
||||
)
|
||||
|
||||
hashes = MapSet.new(hashes)
|
||||
Logger.debug("spool: present #{inspect(present)} vs. to_keep #{inspect(hashes)}")
|
||||
|
||||
for hash <- MapSet.difference(present, hashes) do
|
||||
case File.rm(hash_to_path(hash, state)) do
|
||||
:ok -> nil
|
||||
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
# internal interface
|
||||
|
||||
defp hash_to_path(hash, state) do
|
||||
|
|
Loading…
Reference in a new issue