diff --git a/.gitignore b/.gitignore index 27d84c9..f678f57 100644 --- a/.gitignore +++ b/.gitignore @@ -4,8 +4,12 @@ # User configuration # /config/ /.keydb/ +/.spool/ +/.sessions.dets /config/prod* /config/runtime.exs +/mock_spool/ +/mock_sessions.dets # If you run "mix test --cover", coverage assets end up here. /cover/ diff --git a/README.md b/README.md index 445cf45..79a8d57 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ if config_env() == :prod do config :floof, listen_opts: [ip: {192,168,0,1}] config :floof, pubkey_config: %{:keydb => ".keydb"} config :floof, spool_dir: ".spool" + config :floof, sessions_file: ".sessions.dets" end ``` @@ -49,6 +50,7 @@ if config_env() == :prod do config :floof, listen_port: nil config :floof, pubkey_config: %{:keydb => ".keydb"} config :floof, spool_dir: ".spool" + config :floof, sessions_file: ".sessions.dets" config :floof, upstreams: [ diff --git a/lib/floof.ex b/lib/floof.ex index b4edb05..4a1d05a 100644 --- a/lib/floof.ex +++ b/lib/floof.ex @@ -82,7 +82,8 @@ defmodule Floof do backlog = if sesskey != nil do # make sure that stuff gets handled uniformly - Floof.SessionManager.set(sesskey, dcdhash, dcd) + Floof.SessionManager.set_soft(sesskey, dcdhash) + Floof.PacketSpool.store_new(dcdhash, dcd) else :ok = send_summary(client, :requestpull, [dcdhash]) Map.put(backlog, dcdhash, dcd) @@ -100,7 +101,10 @@ defmodule Floof do {:tcp_closed, _} -> # put all the backlogged stuff back if possible if sesskey != nil do - Floof.SessionManager.set_multi(sesskey, backlog) + for {dcdhash, dcd} <- backlog do + Floof.SessionManager.set_soft(sesskey, dcdhash) + Floof.PacketSpool.store_new(dcdhash, dcd) + end Floof.SessionManager.detach(sesskey, self()) end @@ -120,8 +124,11 @@ defmodule Floof do case dcd do {:session, {:attach, key}} -> Floof.SessionManager.attach(key, self()) - Floof.SessionManager.set_multi(sesskey, backlog) - backlog + for {dcdhash, dcd} <- backlog do + Floof.SessionManager.set_soft(sesskey, dcdhash) + Floof.PacketSpool.store_new(dcdhash, dcd) + end + %{} {:summary, {:Summary, direction, hashes}} -> case direction do @@ -136,7 +143,10 @@ defmodule Floof do item = case Map.fetch(backlog, mhash) do {:ok, item} -> item - :error -> Floof.SessionManager.get(sesskey, mhash) + :error -> case Floof.PacketSpool.fetch(mhash) do + {:ok, item} -> item + _ -> nil + end end if item != nil do diff --git a/lib/floof/application.ex b/lib/floof/application.ex index 13505e6..2951af7 100644 --- a/lib/floof/application.ex +++ b/lib/floof/application.ex @@ -17,6 +17,7 @@ defmodule Floof.Application do pubkey_mgr = Application.fetch_env!(:floof, :pubkey_mgr) pubkey_config = Application.fetch_env!(:floof, :pubkey_config) spool_dir = Application.fetch_env!(:floof, :spool_dir) + sessions_file = Application.fetch_env!(:floof, :sessions_file) children = [ @@ -33,7 +34,7 @@ defmodule Floof.Application do pubkey_mgr: pubkey_mgr, pubkey_config: pubkey_config )}, - {Floof.SessionManager, %{}} + {Floof.SessionManager, {sessions_file, []}} ] ++ if listen_port != nil do [{Task, fn -> Floof.accept(listen_port, listen_opts) end}] diff --git a/lib/floof/packet_spool.ex b/lib/floof/packet_spool.ex index 0e93062..a2d30b7 100644 --- a/lib/floof/packet_spool.ex +++ b/lib/floof/packet_spool.ex @@ -18,7 +18,11 @@ defmodule Floof.PacketSpool do ## packet is expected to be of type :XferBlob def store(hash, packet) do - GenServer.call(__MODULE__, {:store, hash, packet}) + GenServer.call(__MODULE__, {:store, hash, packet, true}) + end + + def store_new(hash, packet) do + GenServer.call(__MODULE__, {:store, hash, packet, false}) end def fetch(hash) do @@ -29,6 +33,10 @@ defmodule Floof.PacketSpool do GenServer.cast(__MODULE__, {:drop, hash}) end + def keep_only(hashes) do + GenServer.cast(__MODULE__, {:keep_only, hashes}) + end + ### server interface @impl true @@ -37,11 +45,18 @@ defmodule Floof.PacketSpool do end @impl true - def handle_call({:store, hash, packet}, _, state) do + def handle_call({:store, hash, packet, overwrite}, _, state) do + path = hash_to_path(hash, state) echain( state, - fn -> :FloofProtocol.encode(:XferBlob, packet) end, - &File.write(hash_to_path(hash, state), &1) + fn -> + if not overwrite and File.exists?(path) do + :ok + else + :FloofProtocol.encode(:XferBlob, packet) + end + end, + &File.write(path, &1) ) end @@ -63,6 +78,33 @@ defmodule Floof.PacketSpool do {:noreply, state} end + @impl true + def handle_cast({:keep_only, hashes}, state) do + case File.ls(state) do + {:error, e} -> + Logger.error("spool: unable to browse spool directory for garbage collection: #{inspect(e)}") + + {:ok, items} -> + present = MapSet.new(Enum.flat_map(items, fn x -> + case Base.url_decode64(x) do + {:ok, y} -> [y] + {:error, e} -> + Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}") + [] + end + end)) + hashes = MapSet.new(hashes) + Logger.debug("spool: present #{inspect(present)} vs. to_keep #{inspect(hashes)}") + for hash <- MapSet.difference(present, hashes) do + case File.rm(hash_to_path(hash, state)) do + :ok -> nil + {:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}") + end + end + end + {:noreply, state} + end + # internal interface defp hash_to_path(hash, state) do diff --git a/lib/floof/session_manager.ex b/lib/floof/session_manager.ex index 0135e2e..6767e4e 100644 --- a/lib/floof/session_manager.ex +++ b/lib/floof/session_manager.ex @@ -3,17 +3,20 @@ defmodule Floof.SessionManager do require Logger require Record - # structure of data - # {hash=>packet storage, sesskey=>MapSet(hashes) dets data} + # structure of data := %{dets file, subscription Map} + # dets data := {sesskey, hash} (via bag) - Record.defrecord(:sessent, sesskey: "", hashes: MapSet.new()) + defstruct dets_file: "/var/spool/floof/sess.dets", subs: %{} - def start_link(initial_value) do - Agent.start_link(fn -> initial_value end, name: __MODULE__) + def start_link({file, opts}) do + Agent.start_link(fn -> + {:ok, file} = :dets.open_file(file, opts ++ [type: :bag]) + %Floof.SessionManager{dets_file: file} + end, name: __MODULE__) end def attach(key, pid) do - modify_subscription(key, fn oldpid, m -> + modify_subscription(key, fn oldpid, dets_file -> if oldpid != pid do Floof.Distributor.unregister(pid) @@ -23,8 +26,10 @@ defmodule Floof.SessionManager do end end - if not Enum.empty?(m) do - send(pid, {:SessionPushed, key}) + case :dets.lookup(dets_file, key) do + [] -> nil + {:error, _} -> nil + _ -> send(pid, {:SessionPushed, key}) end pid @@ -44,114 +49,72 @@ defmodule Floof.SessionManager do defp modify_subscription(key, handler) do Agent.cast(__MODULE__, fn state -> - {_, state2} = - Map.get_and_update(state, key, fn ent -> - {oldpid, m} = entry_dfl(ent) - {nil, {handler.(oldpid, m), m}} + {_, subs2} = + Map.get_and_update(state.subs, key, fn oldpid -> + {nil, handler.(oldpid, state.dets_file)} end) - state2 + %Floof.SessionManager{state | subs: subs2} end) end - def set(key, subkey, value) do - set_one_internal(key, &Map.put(&1, subkey, value)) - end - - def set_multi(_, %{}) do - nil - end - - def set_multi(key, backlog) do - set_one_internal(key, &Map.merge(&1, backlog)) - end - def set_for_all(subkey, value, origin) do + Floof.PacketSpool.store(subkey, value) Agent.cast(__MODULE__, fn state -> - for {key, {sub, m}} <- state, into: %{} do - m2 = - if origin != sub do - if sub != nil do - send(sub, {:SessionPushed, key}) - end + filter_keys = MapSet.new(for {key, sub} <- state.subs do + if origin != sub and sub != nil do + send(sub, {:SessionPushed, key, subkey}) + key + else + nil + end + end) - Map.put(m, subkey, value) - else - m - end + {:ok, all_keys} = all_session_keys(state) + added_dets_ents = for key <- MapSet.difference(all_keys, filter_keys), do: {key, subkey} + :ok = :dets.insert(state.dets_file, added_dets_ents) - {key, {sub, m2}} - end + state + end) + end + + def set_soft(key, subkey) do + Agent.cast(__MODULE__, fn state -> + :ok = :dets.insert(state.dets_file, {key, subkey}) + state end) end def peek(key) do Agent.get( __MODULE__, - &MapSet.new( - case Map.get(&1, key) do - nil -> [] - {_, m} -> Map.keys(m) - end - ) - ) - end - - def get(key, subkey) do - Agent.get( - __MODULE__, - &case Map.get(&1, key) do - nil -> nil - {_, m} -> Map.get(m, subkey) + &case :dets.lookup(&1.dets_file, key) do + {:error, _} -> [] + items -> Enum.map(items, fn {_, x} -> x end) end ) end - def delete(key, subkey) do - xdrop_internal(key, &Map.delete(&1, subkey)) - end - def drop(key, subkeys) do - xdrop_internal(key, &Map.drop(&1, subkeys)) + Agent.cast(__MODULE__, fn state -> + dets_file = state.dets_file + :ok = :dets.delete_object(dets_file, Enum.map(subkeys, fn x -> {key, x} end)) + + # garbage collect unused objects + case :dets.match(dets_file, {'_', '$1'}) do + {:error, e} -> + Logger.error("garbage collection error: #{inspect(e)}") + + keys -> + Floof.PacketSpool.keep_only(for [key] <- keys, do: key) + end + end) end - defp entry_dfl(got) do - case got do - nil -> {nil, %{}} - x -> x + defp all_session_keys(state) do + case :dets.select(state.dets_file, [{{'_', '_'}, [], ['$1']}]) do + {:error, x} -> {:error, x} + items -> {:ok, Enum.map(items, fn {x} -> x end)} end end - - defp set_one_internal(key, handler) do - Agent.cast(__MODULE__, fn state -> - {sub, state2} = - Map.get_and_update(state, key, fn ent -> - {sub, m} = entry_dfl(ent) - {sub, {sub, handler.(m)}} - end) - - if sub != nil do - send(sub, {:SessionPushed, key}) - end - - state2 - end) - end - - defp xdrop_internal(key, handler) do - Agent.cast(__MODULE__, fn state -> - {_, m} = - Map.get_and_update(state, key, fn ent -> - {sub, m} = entry_dfl(ent) - - case {sub, handler.(m)} do - {nil, %{}} -> :pop - x -> {nil, x} - end - end) - - m - end) - end end - diff --git a/mix.exs b/mix.exs index f4ad846..2e6ab68 100644 --- a/mix.exs +++ b/mix.exs @@ -35,8 +35,13 @@ defmodule Floof.MixProject do # which get added to all processed messages set_markers: [], pubkey_mgr: Floof.PubkeyMgr.FileDB, + # file locations + # the keydb is only read, and is used to check message signatures pubkey_config: %{:keydb => "mock_keydb"}, - spool_dir: "mock_spool" + # the spool dir saves messages in-transit, and is used to prevent RAM OOM + spool_dir: "mock_spool", + # the session file is used for persistence of session data across restarts + session_file: "mock_sessions.dets", ] ] end