first try at persistence
This commit is contained in:
parent
61312e68ce
commit
dc7db318e8
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -4,8 +4,12 @@
|
|||
# User configuration
|
||||
# /config/
|
||||
/.keydb/
|
||||
/.spool/
|
||||
/.sessions.dets
|
||||
/config/prod*
|
||||
/config/runtime.exs
|
||||
/mock_spool/
|
||||
/mock_sessions.dets
|
||||
|
||||
# If you run "mix test --cover", coverage assets end up here.
|
||||
/cover/
|
||||
|
|
|
@ -37,6 +37,7 @@ if config_env() == :prod do
|
|||
config :floof, listen_opts: [ip: {192,168,0,1}]
|
||||
config :floof, pubkey_config: %{:keydb => ".keydb"}
|
||||
config :floof, spool_dir: ".spool"
|
||||
config :floof, sessions_file: ".sessions.dets"
|
||||
end
|
||||
```
|
||||
|
||||
|
@ -49,6 +50,7 @@ if config_env() == :prod do
|
|||
config :floof, listen_port: nil
|
||||
config :floof, pubkey_config: %{:keydb => ".keydb"}
|
||||
config :floof, spool_dir: ".spool"
|
||||
config :floof, sessions_file: ".sessions.dets"
|
||||
|
||||
config :floof,
|
||||
upstreams: [
|
||||
|
|
20
lib/floof.ex
20
lib/floof.ex
|
@ -82,7 +82,8 @@ defmodule Floof do
|
|||
backlog =
|
||||
if sesskey != nil do
|
||||
# make sure that stuff gets handled uniformly
|
||||
Floof.SessionManager.set(sesskey, dcdhash, dcd)
|
||||
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
||||
Floof.PacketSpool.store_new(dcdhash, dcd)
|
||||
else
|
||||
:ok = send_summary(client, :requestpull, [dcdhash])
|
||||
Map.put(backlog, dcdhash, dcd)
|
||||
|
@ -100,7 +101,10 @@ defmodule Floof do
|
|||
{:tcp_closed, _} ->
|
||||
# put all the backlogged stuff back if possible
|
||||
if sesskey != nil do
|
||||
Floof.SessionManager.set_multi(sesskey, backlog)
|
||||
for {dcdhash, dcd} <- backlog do
|
||||
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
||||
Floof.PacketSpool.store_new(dcdhash, dcd)
|
||||
end
|
||||
Floof.SessionManager.detach(sesskey, self())
|
||||
end
|
||||
|
||||
|
@ -120,8 +124,11 @@ defmodule Floof do
|
|||
case dcd do
|
||||
{:session, {:attach, key}} ->
|
||||
Floof.SessionManager.attach(key, self())
|
||||
Floof.SessionManager.set_multi(sesskey, backlog)
|
||||
backlog
|
||||
for {dcdhash, dcd} <- backlog do
|
||||
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
||||
Floof.PacketSpool.store_new(dcdhash, dcd)
|
||||
end
|
||||
%{}
|
||||
|
||||
{:summary, {:Summary, direction, hashes}} ->
|
||||
case direction do
|
||||
|
@ -136,7 +143,10 @@ defmodule Floof do
|
|||
item =
|
||||
case Map.fetch(backlog, mhash) do
|
||||
{:ok, item} -> item
|
||||
:error -> Floof.SessionManager.get(sesskey, mhash)
|
||||
:error -> case Floof.PacketSpool.fetch(mhash) do
|
||||
{:ok, item} -> item
|
||||
_ -> nil
|
||||
end
|
||||
end
|
||||
|
||||
if item != nil do
|
||||
|
|
|
@ -17,6 +17,7 @@ defmodule Floof.Application do
|
|||
pubkey_mgr = Application.fetch_env!(:floof, :pubkey_mgr)
|
||||
pubkey_config = Application.fetch_env!(:floof, :pubkey_config)
|
||||
spool_dir = Application.fetch_env!(:floof, :spool_dir)
|
||||
sessions_file = Application.fetch_env!(:floof, :sessions_file)
|
||||
|
||||
children =
|
||||
[
|
||||
|
@ -33,7 +34,7 @@ defmodule Floof.Application do
|
|||
pubkey_mgr: pubkey_mgr,
|
||||
pubkey_config: pubkey_config
|
||||
)},
|
||||
{Floof.SessionManager, %{}}
|
||||
{Floof.SessionManager, {sessions_file, []}}
|
||||
] ++
|
||||
if listen_port != nil do
|
||||
[{Task, fn -> Floof.accept(listen_port, listen_opts) end}]
|
||||
|
|
|
@ -18,7 +18,11 @@ defmodule Floof.PacketSpool do
|
|||
|
||||
## packet is expected to be of type :XferBlob
|
||||
def store(hash, packet) do
|
||||
GenServer.call(__MODULE__, {:store, hash, packet})
|
||||
GenServer.call(__MODULE__, {:store, hash, packet, true})
|
||||
end
|
||||
|
||||
def store_new(hash, packet) do
|
||||
GenServer.call(__MODULE__, {:store, hash, packet, false})
|
||||
end
|
||||
|
||||
def fetch(hash) do
|
||||
|
@ -29,6 +33,10 @@ defmodule Floof.PacketSpool do
|
|||
GenServer.cast(__MODULE__, {:drop, hash})
|
||||
end
|
||||
|
||||
def keep_only(hashes) do
|
||||
GenServer.cast(__MODULE__, {:keep_only, hashes})
|
||||
end
|
||||
|
||||
### server interface
|
||||
|
||||
@impl true
|
||||
|
@ -37,11 +45,18 @@ defmodule Floof.PacketSpool do
|
|||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:store, hash, packet}, _, state) do
|
||||
def handle_call({:store, hash, packet, overwrite}, _, state) do
|
||||
path = hash_to_path(hash, state)
|
||||
echain(
|
||||
state,
|
||||
fn -> :FloofProtocol.encode(:XferBlob, packet) end,
|
||||
&File.write(hash_to_path(hash, state), &1)
|
||||
fn ->
|
||||
if not overwrite and File.exists?(path) do
|
||||
:ok
|
||||
else
|
||||
:FloofProtocol.encode(:XferBlob, packet)
|
||||
end
|
||||
end,
|
||||
&File.write(path, &1)
|
||||
)
|
||||
end
|
||||
|
||||
|
@ -63,6 +78,33 @@ defmodule Floof.PacketSpool do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:keep_only, hashes}, state) do
|
||||
case File.ls(state) do
|
||||
{:error, e} ->
|
||||
Logger.error("spool: unable to browse spool directory for garbage collection: #{inspect(e)}")
|
||||
|
||||
{:ok, items} ->
|
||||
present = MapSet.new(Enum.flat_map(items, fn x ->
|
||||
case Base.url_decode64(x) do
|
||||
{:ok, y} -> [y]
|
||||
{:error, e} ->
|
||||
Logger.warn("spool: unable to decode entry name #{x}: #{inspect(e)}")
|
||||
[]
|
||||
end
|
||||
end))
|
||||
hashes = MapSet.new(hashes)
|
||||
Logger.debug("spool: present #{inspect(present)} vs. to_keep #{inspect(hashes)}")
|
||||
for hash <- MapSet.difference(present, hashes) do
|
||||
case File.rm(hash_to_path(hash, state)) do
|
||||
:ok -> nil
|
||||
{:error, e} -> Logger.error("spool: unable to remove #{inspect(hash)}: #{inspect(e)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
# internal interface
|
||||
|
||||
defp hash_to_path(hash, state) do
|
||||
|
|
|
@ -3,17 +3,20 @@ defmodule Floof.SessionManager do
|
|||
require Logger
|
||||
require Record
|
||||
|
||||
# structure of data
|
||||
# {hash=>packet storage, sesskey=>MapSet(hashes) dets data}
|
||||
# structure of data := %{dets file, subscription Map}
|
||||
# dets data := {sesskey, hash} (via bag)
|
||||
|
||||
Record.defrecord(:sessent, sesskey: "", hashes: MapSet.new())
|
||||
defstruct dets_file: "/var/spool/floof/sess.dets", subs: %{}
|
||||
|
||||
def start_link(initial_value) do
|
||||
Agent.start_link(fn -> initial_value end, name: __MODULE__)
|
||||
def start_link({file, opts}) do
|
||||
Agent.start_link(fn ->
|
||||
{:ok, file} = :dets.open_file(file, opts ++ [type: :bag])
|
||||
%Floof.SessionManager{dets_file: file}
|
||||
end, name: __MODULE__)
|
||||
end
|
||||
|
||||
def attach(key, pid) do
|
||||
modify_subscription(key, fn oldpid, m ->
|
||||
modify_subscription(key, fn oldpid, dets_file ->
|
||||
if oldpid != pid do
|
||||
Floof.Distributor.unregister(pid)
|
||||
|
||||
|
@ -23,8 +26,10 @@ defmodule Floof.SessionManager do
|
|||
end
|
||||
end
|
||||
|
||||
if not Enum.empty?(m) do
|
||||
send(pid, {:SessionPushed, key})
|
||||
case :dets.lookup(dets_file, key) do
|
||||
[] -> nil
|
||||
{:error, _} -> nil
|
||||
_ -> send(pid, {:SessionPushed, key})
|
||||
end
|
||||
|
||||
pid
|
||||
|
@ -44,114 +49,72 @@ defmodule Floof.SessionManager do
|
|||
|
||||
defp modify_subscription(key, handler) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
{_, state2} =
|
||||
Map.get_and_update(state, key, fn ent ->
|
||||
{oldpid, m} = entry_dfl(ent)
|
||||
{nil, {handler.(oldpid, m), m}}
|
||||
{_, subs2} =
|
||||
Map.get_and_update(state.subs, key, fn oldpid ->
|
||||
{nil, handler.(oldpid, state.dets_file)}
|
||||
end)
|
||||
|
||||
state2
|
||||
%Floof.SessionManager{state | subs: subs2}
|
||||
end)
|
||||
end
|
||||
|
||||
def set(key, subkey, value) do
|
||||
set_one_internal(key, &Map.put(&1, subkey, value))
|
||||
end
|
||||
|
||||
def set_multi(_, %{}) do
|
||||
nil
|
||||
end
|
||||
|
||||
def set_multi(key, backlog) do
|
||||
set_one_internal(key, &Map.merge(&1, backlog))
|
||||
end
|
||||
|
||||
def set_for_all(subkey, value, origin) do
|
||||
Floof.PacketSpool.store(subkey, value)
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
for {key, {sub, m}} <- state, into: %{} do
|
||||
m2 =
|
||||
if origin != sub do
|
||||
if sub != nil do
|
||||
send(sub, {:SessionPushed, key})
|
||||
end
|
||||
filter_keys = MapSet.new(for {key, sub} <- state.subs do
|
||||
if origin != sub and sub != nil do
|
||||
send(sub, {:SessionPushed, key, subkey})
|
||||
key
|
||||
else
|
||||
nil
|
||||
end
|
||||
end)
|
||||
|
||||
Map.put(m, subkey, value)
|
||||
else
|
||||
m
|
||||
end
|
||||
{:ok, all_keys} = all_session_keys(state)
|
||||
added_dets_ents = for key <- MapSet.difference(all_keys, filter_keys), do: {key, subkey}
|
||||
:ok = :dets.insert(state.dets_file, added_dets_ents)
|
||||
|
||||
{key, {sub, m2}}
|
||||
end
|
||||
state
|
||||
end)
|
||||
end
|
||||
|
||||
def set_soft(key, subkey) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
:ok = :dets.insert(state.dets_file, {key, subkey})
|
||||
state
|
||||
end)
|
||||
end
|
||||
|
||||
def peek(key) do
|
||||
Agent.get(
|
||||
__MODULE__,
|
||||
&MapSet.new(
|
||||
case Map.get(&1, key) do
|
||||
nil -> []
|
||||
{_, m} -> Map.keys(m)
|
||||
end
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
def get(key, subkey) do
|
||||
Agent.get(
|
||||
__MODULE__,
|
||||
&case Map.get(&1, key) do
|
||||
nil -> nil
|
||||
{_, m} -> Map.get(m, subkey)
|
||||
&case :dets.lookup(&1.dets_file, key) do
|
||||
{:error, _} -> []
|
||||
items -> Enum.map(items, fn {_, x} -> x end)
|
||||
end
|
||||
)
|
||||
end
|
||||
|
||||
def delete(key, subkey) do
|
||||
xdrop_internal(key, &Map.delete(&1, subkey))
|
||||
end
|
||||
|
||||
def drop(key, subkeys) do
|
||||
xdrop_internal(key, &Map.drop(&1, subkeys))
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
dets_file = state.dets_file
|
||||
:ok = :dets.delete_object(dets_file, Enum.map(subkeys, fn x -> {key, x} end))
|
||||
|
||||
# garbage collect unused objects
|
||||
case :dets.match(dets_file, {'_', '$1'}) do
|
||||
{:error, e} ->
|
||||
Logger.error("garbage collection error: #{inspect(e)}")
|
||||
|
||||
keys ->
|
||||
Floof.PacketSpool.keep_only(for [key] <- keys, do: key)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp entry_dfl(got) do
|
||||
case got do
|
||||
nil -> {nil, %{}}
|
||||
x -> x
|
||||
defp all_session_keys(state) do
|
||||
case :dets.select(state.dets_file, [{{'_', '_'}, [], ['$1']}]) do
|
||||
{:error, x} -> {:error, x}
|
||||
items -> {:ok, Enum.map(items, fn {x} -> x end)}
|
||||
end
|
||||
end
|
||||
|
||||
defp set_one_internal(key, handler) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
{sub, state2} =
|
||||
Map.get_and_update(state, key, fn ent ->
|
||||
{sub, m} = entry_dfl(ent)
|
||||
{sub, {sub, handler.(m)}}
|
||||
end)
|
||||
|
||||
if sub != nil do
|
||||
send(sub, {:SessionPushed, key})
|
||||
end
|
||||
|
||||
state2
|
||||
end)
|
||||
end
|
||||
|
||||
defp xdrop_internal(key, handler) do
|
||||
Agent.cast(__MODULE__, fn state ->
|
||||
{_, m} =
|
||||
Map.get_and_update(state, key, fn ent ->
|
||||
{sub, m} = entry_dfl(ent)
|
||||
|
||||
case {sub, handler.(m)} do
|
||||
{nil, %{}} -> :pop
|
||||
x -> {nil, x}
|
||||
end
|
||||
end)
|
||||
|
||||
m
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
7
mix.exs
7
mix.exs
|
@ -35,8 +35,13 @@ defmodule Floof.MixProject do
|
|||
# which get added to all processed messages
|
||||
set_markers: [],
|
||||
pubkey_mgr: Floof.PubkeyMgr.FileDB,
|
||||
# file locations
|
||||
# the keydb is only read, and is used to check message signatures
|
||||
pubkey_config: %{:keydb => "mock_keydb"},
|
||||
spool_dir: "mock_spool"
|
||||
# the spool dir saves messages in-transit, and is used to prevent RAM OOM
|
||||
spool_dir: "mock_spool",
|
||||
# the session file is used for persistence of session data across restarts
|
||||
session_file: "mock_sessions.dets",
|
||||
]
|
||||
]
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue