floof/lib/floof.ex
2022-11-28 01:05:32 +01:00

253 lines
6.7 KiB
Elixir

defmodule Floof do
@moduledoc """
Documentation for `Floof`.
"""
require Logger
def accept(port, opts \\ []) do
{:ok, socket} =
:gen_tcp.listen(
port,
[
:binary,
packet: 4,
active: false,
reuseaddr: true
] ++ opts
)
try do
Logger.info("Accepting connections on #{port}")
loop_acceptor(socket)
after
:ok = :gen_tcp.close(socket)
end
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} =
Task.Supervisor.start_child(Floof.TaskSupervisor, fn ->
try do
serve(client, %{}, nil)
after
:ok = :gen_tcp.close(client)
end
end)
:ok = :gen_tcp.controlling_process(client, pid)
# switch to active now that we handed over the client
:inet.setopts(client, [{:active, true}])
Floof.Distributor.register(pid)
loop_acceptor(socket)
end
def connect(host, port, sesskey \\ nil) do
{:ok, socket} =
:gen_tcp.connect(host, port,
packet: 4,
active: true
)
try do
Floof.Distributor.register(self())
Logger.info("Established connection to #{inspect(host)}:#{port}")
if sesskey != nil do
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:session, {:attach, sesskey}})
:ok = :gen_tcp.send(socket, encd)
Floof.SessionManager.attach(sesskey, self())
end
serve(socket, %{}, sesskey)
after
:ok = :gen_tcp.close(socket)
end
end
defp serve(client, backlog, sesskey) do
{backlog, sesskey} =
receive do
{:tcp, _, message} ->
message =
if :erlang.is_binary(message) do
message
else
:erlang.list_to_binary(message)
end
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
handle_incoming(client, backlog, sesskey, dcd)
{:fwdxfer, {dcdhash, dcd}} ->
backlog =
if sesskey != nil do
# make sure that stuff gets handled uniformly
Floof.SessionManager.set_soft_multi(sesskey, [dcdhash])
Floof.PacketSpool.store_new(dcdhash, dcd)
else
:ok = send_summary(client, :requestpull, [dcdhash])
Map.put(backlog, dcdhash, dcd)
end
{backlog, sesskey}
{:fwddrop, dcdhash} ->
to_drop = coalesce_fwddrops(MapSet.new([dcdhash]))
:ok = send_summary(client, :drop, to_drop)
{backlog, sesskey}
{:SessionPushed, sesskey} ->
sesskeys = coalesce_sesspushes([sesskey])
[sesskey | _] = sesskeys
dcdids = Enum.map(MapSet.new(sesskeys), &MapSet.new(Floof.SessionManager.peek(&1)))
dcdids = Enum.reduce(dcdids, &MapSet.union(&1, &2))
Logger.debug("presenting session content #{inspect(dcdids)})")
:ok = send_summary(client, :requestpull, dcdids)
{backlog, sesskey}
{:SessionDetached, old_sesskey} ->
sesskey2 =
if sesskey == old_sesskey do
nil
else
sesskey
end
{backlog, sesskey2}
{:tcp_closed, _} ->
# put all the backlogged stuff back if possible
if sesskey != nil do
transfer_backlog_to_session(sesskey, backlog)
Floof.SessionManager.detach(sesskey, self())
end
exit(:normal)
x ->
Logger.warn("unable to handle request: #{inspect(x)}")
{backlog, sesskey}
end
serve(client, backlog, sesskey)
end
defp handle_incoming(client, backlog, sesskey, dcd) do
Logger.debug("got packet #{inspect(dcd)}")
case dcd do
{:session, {:attach, sesskey_new}} ->
if sesskey != nil do
Floof.SessionManager.detach(sesskey, self())
end
Floof.SessionManager.attach(sesskey_new, self())
transfer_backlog_to_session(sesskey_new, backlog)
{%{}, sesskey_new}
{:summary, {:Summary, direction, hashes}} ->
case direction do
:requestpull ->
{to_pull, to_drop} = partition_hashes(hashes)
:ok = send_summary(client, :pull, to_pull)
:ok = send_summary(client, :drop, to_drop)
{backlog, sesskey}
:pull ->
for mhash <- hashes do
mbitem =
case sesskey do
nil -> Map.fetch(backlog, mhash)
_ -> Floof.PacketSpool.fetch(mhash)
end
case mbitem do
{:ok, item} ->
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
:ok = :gen_tcp.send(client, encd)
_ ->
Logger.warn("packet #{Base.url_encode64(mhash)} gone missing")
if sesskey != nil do
Floof.SessionManager.drop(sesskey, hashes)
end
nil
end
end
{backlog, sesskey}
:drop ->
if sesskey != nil do
Floof.SessionManager.drop(sesskey, hashes)
end
{Map.drop(backlog, hashes), sesskey}
end
{:xfer, xf} ->
mhash = Floof.Message.emit(xf)
# the following indirection is necessary
# to reduce the amount of 'drop' notifications
send(self(), {:fwddrop, mhash})
{backlog, sesskey}
end
end
defp send_summary(client, direction, dcdids) do
if not Enum.empty?(dcdids) do
dat = {:Summary, direction, Enum.to_list(dcdids)}
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:gen_tcp.send(client, encd)
else
:ok
end
end
defp transfer_backlog_to_session(sesskey, backlog) do
Floof.SessionManager.set_soft_multi(sesskey, for({dcdhash, _} <- backlog, do: dcdhash))
:ok = Floof.PacketSpool.store_new_multi(backlog)
end
defp coalesce_sesspushes(sesskeys) do
receive do
{:SessionPushed, sesskey} -> coalesce_sesspushes([sesskey | sesskeys])
after
10 -> sesskeys
end
end
defp coalesce_fwddrops(to_drop) do
receive do
{:fwddrop, dcdhash} -> coalesce_fwddrops(MapSet.put(to_drop, dcdhash))
after
10 -> to_drop
end
end
defp partition_hashes(xs) do
xs
|> Floof.DistributorSeen.have_we_multi()
|> Enum.to_list()
|> partition_list(MapSet.new(), MapSet.new())
end
defp partition_list([], lo, hi) do
{lo, hi}
end
defp partition_list([{key, cnd} | xs], lo, hi) do
{lo2, hi2} =
if cnd do
{lo, MapSet.put(hi, key)}
else
{MapSet.put(lo, key), hi}
end
partition_list(xs, lo2, hi2)
end
end