2022-11-22 15:41:59 +00:00
|
|
|
defmodule Floof do
|
|
|
|
@moduledoc """
|
|
|
|
Documentation for `Floof`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
require Logger
|
|
|
|
|
2022-11-24 19:29:30 +00:00
|
|
|
def accept(port, opts \\ []) do
|
2022-11-24 19:01:34 +00:00
|
|
|
{:ok, socket} =
|
2022-11-24 19:29:30 +00:00
|
|
|
:gen_tcp.listen(
|
|
|
|
port,
|
|
|
|
[
|
|
|
|
:binary,
|
|
|
|
packet: 4,
|
|
|
|
active: false,
|
|
|
|
reuseaddr: true
|
|
|
|
] ++ opts
|
|
|
|
)
|
2022-11-24 19:01:34 +00:00
|
|
|
|
2022-11-25 23:54:03 +00:00
|
|
|
try do
|
|
|
|
Logger.info("Accepting connections on #{port}")
|
|
|
|
loop_acceptor(socket)
|
|
|
|
after
|
|
|
|
:ok = :gen_tcp.close(socket)
|
|
|
|
end
|
2022-11-22 15:41:59 +00:00
|
|
|
end
|
|
|
|
|
2022-11-22 23:17:46 +00:00
|
|
|
defp loop_acceptor(socket) do
|
2022-11-22 15:41:59 +00:00
|
|
|
{:ok, client} = :gen_tcp.accept(socket)
|
2022-11-24 19:01:34 +00:00
|
|
|
|
|
|
|
{:ok, pid} =
|
|
|
|
Task.Supervisor.start_child(Floof.TaskSupervisor, fn ->
|
2022-11-24 22:14:11 +00:00
|
|
|
serve(client, %{}, nil)
|
2022-11-24 19:01:34 +00:00
|
|
|
:ok = :gen_tcp.close(client)
|
|
|
|
end)
|
|
|
|
|
2022-11-22 15:41:59 +00:00
|
|
|
:ok = :gen_tcp.controlling_process(client, pid)
|
2022-11-22 23:17:46 +00:00
|
|
|
# switch to active now that we handed over the client
|
|
|
|
:inet.setopts(client, [{:active, true}])
|
2022-11-24 17:32:48 +00:00
|
|
|
Floof.Distributor.register(pid)
|
2022-11-22 15:41:59 +00:00
|
|
|
loop_acceptor(socket)
|
|
|
|
end
|
|
|
|
|
2022-11-25 00:29:17 +00:00
|
|
|
def connect(host, port, sesskey \\ nil) do
|
2022-11-24 19:01:34 +00:00
|
|
|
{:ok, socket} =
|
|
|
|
:gen_tcp.connect(host, port,
|
|
|
|
packet: 4,
|
|
|
|
active: true
|
|
|
|
)
|
|
|
|
|
2022-11-25 23:54:03 +00:00
|
|
|
try do
|
|
|
|
Floof.Distributor.register(self())
|
|
|
|
Logger.info("Established connection to #{inspect(host)}:#{port}")
|
2022-11-25 00:29:17 +00:00
|
|
|
|
2022-11-25 23:54:03 +00:00
|
|
|
if sesskey != nil do
|
|
|
|
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:session, {:attach, sesskey}})
|
|
|
|
:ok = :gen_tcp.send(socket, encd)
|
|
|
|
end
|
2022-11-25 00:29:17 +00:00
|
|
|
|
2022-11-25 23:54:03 +00:00
|
|
|
serve(socket, %{}, nil)
|
|
|
|
after
|
|
|
|
:ok = :gen_tcp.close(socket)
|
|
|
|
end
|
2022-11-24 09:34:32 +00:00
|
|
|
end
|
|
|
|
|
2022-11-24 22:14:11 +00:00
|
|
|
defp serve(client, backlog, sesskey) do
|
2022-11-25 23:54:03 +00:00
|
|
|
{backlog, sesskey} =
|
|
|
|
receive do
|
|
|
|
{:tcp, _, message} ->
|
|
|
|
message =
|
|
|
|
if :erlang.is_binary(message) do
|
|
|
|
message
|
|
|
|
else
|
|
|
|
:erlang.list_to_binary(message)
|
|
|
|
end
|
|
|
|
|
|
|
|
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
|
|
|
|
backlog = handle_incoming(client, backlog, sesskey, dcd)
|
|
|
|
{backlog, sesskey}
|
|
|
|
|
|
|
|
{:fwdxfer, {dcdhash, dcd}} ->
|
|
|
|
backlog =
|
|
|
|
if sesskey != nil do
|
|
|
|
# make sure that stuff gets handled uniformly
|
2022-11-26 14:34:51 +00:00
|
|
|
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
|
|
|
Floof.PacketSpool.store_new(dcdhash, dcd)
|
2022-11-25 23:54:03 +00:00
|
|
|
else
|
|
|
|
:ok = send_summary(client, :requestpull, [dcdhash])
|
|
|
|
Map.put(backlog, dcdhash, dcd)
|
|
|
|
end
|
|
|
|
|
|
|
|
{backlog, sesskey}
|
|
|
|
|
|
|
|
{:SessionPushed, sesskey} ->
|
|
|
|
:ok = send_summary(client, :requestpull, Floof.SessionManager.peek(sesskey))
|
|
|
|
{backlog, sesskey}
|
|
|
|
|
|
|
|
{:SessionDetached, _} ->
|
|
|
|
{backlog, nil}
|
|
|
|
|
|
|
|
{:tcp_closed, _} ->
|
|
|
|
# put all the backlogged stuff back if possible
|
|
|
|
if sesskey != nil do
|
2022-11-26 14:34:51 +00:00
|
|
|
for {dcdhash, dcd} <- backlog do
|
|
|
|
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
|
|
|
Floof.PacketSpool.store_new(dcdhash, dcd)
|
|
|
|
end
|
2022-11-25 23:54:03 +00:00
|
|
|
Floof.SessionManager.detach(sesskey, self())
|
2022-11-25 00:40:13 +00:00
|
|
|
end
|
|
|
|
|
2022-11-25 23:58:04 +00:00
|
|
|
exit(:normal)
|
2022-11-25 23:54:03 +00:00
|
|
|
|
|
|
|
x ->
|
|
|
|
Logger.warn("unable to handle request: #{inspect(x)}")
|
|
|
|
{backlog, sesskey}
|
|
|
|
end
|
|
|
|
|
|
|
|
serve(client, backlog, sesskey)
|
2022-11-22 23:17:46 +00:00
|
|
|
end
|
|
|
|
|
2022-11-25 16:16:44 +00:00
|
|
|
defp handle_incoming(client, backlog, sesskey, dcd) do
|
2022-11-22 23:17:46 +00:00
|
|
|
Logger.debug("got packet #{inspect(dcd)}")
|
2022-11-24 19:01:34 +00:00
|
|
|
|
2022-11-22 23:17:46 +00:00
|
|
|
case dcd do
|
2022-11-25 00:29:17 +00:00
|
|
|
{:session, {:attach, key}} ->
|
2022-11-25 16:16:44 +00:00
|
|
|
Floof.SessionManager.attach(key, self())
|
2022-11-26 14:34:51 +00:00
|
|
|
for {dcdhash, dcd} <- backlog do
|
|
|
|
Floof.SessionManager.set_soft(sesskey, dcdhash)
|
|
|
|
Floof.PacketSpool.store_new(dcdhash, dcd)
|
|
|
|
end
|
|
|
|
%{}
|
2022-11-24 19:01:34 +00:00
|
|
|
|
2022-11-24 22:11:09 +00:00
|
|
|
{:summary, {:Summary, direction, hashes}} ->
|
2022-11-22 23:17:46 +00:00
|
|
|
case direction do
|
|
|
|
:requestpull ->
|
2022-11-24 22:11:09 +00:00
|
|
|
{to_pull, to_drop} = partition_hashes(hashes)
|
2022-11-25 00:57:25 +00:00
|
|
|
:ok = send_summary(client, :pull, to_pull)
|
|
|
|
:ok = send_summary(client, :drop, to_drop)
|
2022-11-22 23:17:46 +00:00
|
|
|
backlog
|
2022-11-24 19:01:34 +00:00
|
|
|
|
2022-11-22 23:17:46 +00:00
|
|
|
:pull ->
|
2022-11-24 22:11:09 +00:00
|
|
|
for mhash <- hashes do
|
2022-11-25 22:11:26 +00:00
|
|
|
item =
|
|
|
|
case Map.fetch(backlog, mhash) do
|
|
|
|
{:ok, item} -> item
|
2022-11-26 14:34:51 +00:00
|
|
|
:error -> case Floof.PacketSpool.fetch(mhash) do
|
|
|
|
{:ok, item} -> item
|
|
|
|
_ -> nil
|
|
|
|
end
|
2022-11-25 22:11:26 +00:00
|
|
|
end
|
|
|
|
|
2022-11-25 16:16:44 +00:00
|
|
|
if item != nil do
|
|
|
|
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
|
|
|
|
:ok = :gen_tcp.send(client, encd)
|
2022-11-24 22:11:09 +00:00
|
|
|
end
|
2022-11-22 23:17:46 +00:00
|
|
|
end
|
2022-11-24 19:01:34 +00:00
|
|
|
|
2022-11-24 09:53:14 +00:00
|
|
|
backlog
|
2022-11-24 19:01:34 +00:00
|
|
|
|
|
|
|
:drop ->
|
2022-11-25 16:16:44 +00:00
|
|
|
Floof.SessionManager.drop(sesskey, hashes)
|
2022-11-24 22:11:09 +00:00
|
|
|
Map.drop(backlog, hashes)
|
2022-11-22 23:17:46 +00:00
|
|
|
end
|
2022-11-24 19:01:34 +00:00
|
|
|
|
2022-11-22 23:17:46 +00:00
|
|
|
{:xfer, xf} ->
|
2022-11-24 22:11:09 +00:00
|
|
|
mhash = Floof.Message.emit(xf)
|
2022-11-24 09:53:14 +00:00
|
|
|
# notify the remote end that we successfully consumed the message
|
2022-11-24 20:15:35 +00:00
|
|
|
# non-fatal if this fails
|
2022-11-24 22:11:09 +00:00
|
|
|
send_summary(client, :drop, [mhash])
|
2022-11-22 23:17:46 +00:00
|
|
|
backlog
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2022-11-24 22:11:09 +00:00
|
|
|
defp send_summary(client, direction, dcdids) do
|
2022-11-25 00:50:11 +00:00
|
|
|
if not Enum.empty?(dcdids) do
|
|
|
|
dat = {:Summary, direction, Enum.to_list(dcdids)}
|
|
|
|
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
|
|
|
|
:gen_tcp.send(client, encd)
|
|
|
|
else
|
|
|
|
:ok
|
|
|
|
end
|
2022-11-24 09:41:41 +00:00
|
|
|
end
|
2022-11-24 22:11:09 +00:00
|
|
|
|
|
|
|
defp partition_hashes(xs) do
|
2022-11-25 00:46:51 +00:00
|
|
|
xs
|
2022-11-25 23:23:53 +00:00
|
|
|
|> Floof.DistributorSeen.have_we_multi()
|
2022-11-25 00:46:51 +00:00
|
|
|
|> Enum.to_list()
|
|
|
|
|> partition_list(MapSet.new(), MapSet.new())
|
2022-11-24 22:11:09 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
defp partition_list([], lo, hi) do
|
|
|
|
{lo, hi}
|
|
|
|
end
|
|
|
|
|
|
|
|
defp partition_list([{key, cnd} | xs], lo, hi) do
|
|
|
|
{lo2, hi2} =
|
|
|
|
if cnd do
|
|
|
|
{lo, MapSet.put(hi, key)}
|
|
|
|
else
|
|
|
|
{MapSet.put(lo, key), hi}
|
|
|
|
end
|
|
|
|
|
|
|
|
partition_list(xs, lo2, hi2)
|
|
|
|
end
|
2022-11-22 15:41:59 +00:00
|
|
|
end
|