floof/lib/floof.ex

196 lines
4.9 KiB
Elixir
Raw Normal View History

2022-11-22 15:41:59 +00:00
defmodule Floof do
@moduledoc """
Documentation for `Floof`.
"""
require Logger
2022-11-24 19:29:30 +00:00
def accept(port, opts \\ []) do
2022-11-24 19:01:34 +00:00
{:ok, socket} =
2022-11-24 19:29:30 +00:00
:gen_tcp.listen(
port,
[
:binary,
packet: 4,
active: false,
reuseaddr: true
] ++ opts
)
2022-11-24 19:01:34 +00:00
2022-11-22 15:41:59 +00:00
Logger.info("Accepting connections on #{port}")
loop_acceptor(socket)
2022-11-24 17:41:29 +00:00
:ok = :gen_tcp.close(socket)
2022-11-22 15:41:59 +00:00
end
2022-11-22 23:17:46 +00:00
defp loop_acceptor(socket) do
2022-11-22 15:41:59 +00:00
{:ok, client} = :gen_tcp.accept(socket)
2022-11-24 19:01:34 +00:00
{:ok, pid} =
Task.Supervisor.start_child(Floof.TaskSupervisor, fn ->
serve(client, %{}, nil)
2022-11-24 19:01:34 +00:00
:ok = :gen_tcp.close(client)
end)
2022-11-22 15:41:59 +00:00
:ok = :gen_tcp.controlling_process(client, pid)
2022-11-22 23:17:46 +00:00
# switch to active now that we handed over the client
:inet.setopts(client, [{:active, true}])
2022-11-24 17:32:48 +00:00
Floof.Distributor.register(pid)
2022-11-22 15:41:59 +00:00
loop_acceptor(socket)
end
def connect(host, port, sesskey \\ nil) do
2022-11-24 19:01:34 +00:00
{:ok, socket} =
:gen_tcp.connect(host, port,
packet: 4,
active: true
)
2022-11-24 17:32:48 +00:00
Floof.Distributor.register(self())
Logger.info("Established connection to #{inspect(host)}:#{port}")
if sesskey != nil do
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:session, {:attach, sesskey}})
:ok = :gen_tcp.send(socket, encd)
end
serve(socket, %{}, nil)
2022-11-24 17:41:29 +00:00
:ok = :gen_tcp.close(socket)
end
defp serve(client, backlog, sesskey) do
2022-11-22 23:17:46 +00:00
receive do
{:tcp, _, message} ->
message =
if :erlang.is_binary(message) do
message
else
:erlang.list_to_binary(message)
end
2022-11-22 23:17:46 +00:00
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
2022-11-24 00:33:43 +00:00
backlog = handle_incoming(client, backlog, dcd)
serve(client, backlog, sesskey)
2022-11-24 19:01:34 +00:00
{:fwdxfer, {dcdhash, dcd}} ->
:ok = send_summary(client, :requestpull, [dcdhash])
serve(client, Map.put(backlog, dcdhash, dcd), sesskey)
2022-11-24 19:01:34 +00:00
2022-11-24 00:33:43 +00:00
{:SessionPushed, key} ->
{backlog, added_hashs} = extract_backlog(key, backlog, MapSet.new())
:ok = send_summary(client, :requestpull, added_hashs)
serve(client, backlog, key)
2022-11-24 19:01:34 +00:00
2022-11-24 20:15:35 +00:00
{:tcp_closed, _} ->
# put all the backlogged stuff back if possible
if sesskey != nil do
for item <- backlog do
Floof.SessionManager.push(sesskey, item)
end
Floof.SessionManager.deattach(sesskey, self())
end
2022-11-24 20:15:35 +00:00
2022-11-24 19:01:34 +00:00
x ->
Logger.warn("unable to handle request: #{inspect(x)}")
serve(client, backlog, sesskey)
2022-11-22 23:17:46 +00:00
end
end
defp handle_incoming(client, backlog, dcd) do
Logger.debug("got packet #{inspect(dcd)}")
2022-11-24 19:01:34 +00:00
2022-11-22 23:17:46 +00:00
case dcd do
{:session, {:attach, key}} ->
Floof.Distributor.register_key(key, self())
2022-11-24 00:33:43 +00:00
if Floof.SessionManager.attach(key, self()) do
2022-11-24 19:01:34 +00:00
send(self(), {:SessionPushed, key})
2022-11-24 00:33:43 +00:00
end
2022-11-24 19:01:34 +00:00
2022-11-24 08:46:49 +00:00
backlog
2022-11-24 19:01:34 +00:00
{:summary, {:Summary, direction, hashes}} ->
2022-11-22 23:17:46 +00:00
case direction do
:requestpull ->
{to_pull, to_drop} = partition_hashes(hashes)
if not Enum.empty?(to_pull) do
:ok = send_summary(client, :pull, to_pull)
end
if not Enum.empty?(to_pull) do
:ok = send_summary(client, :drop, to_drop)
end
2022-11-24 19:01:34 +00:00
2022-11-22 23:17:46 +00:00
backlog
2022-11-24 19:01:34 +00:00
2022-11-22 23:17:46 +00:00
:pull ->
for mhash <- hashes do
case Map.fetch(backlog, mhash) do
{:ok, item} ->
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
:ok = :gen_tcp.send(client, encd)
:error ->
nil
end
2022-11-22 23:17:46 +00:00
end
2022-11-24 19:01:34 +00:00
backlog
2022-11-24 19:01:34 +00:00
:drop ->
Map.drop(backlog, hashes)
2022-11-22 23:17:46 +00:00
end
2022-11-24 19:01:34 +00:00
2022-11-22 23:17:46 +00:00
{:xfer, xf} ->
mhash = Floof.Message.emit(xf)
# notify the remote end that we successfully consumed the message
2022-11-24 20:15:35 +00:00
# non-fatal if this fails
send_summary(client, :drop, [mhash])
2022-11-22 23:17:46 +00:00
backlog
end
end
2022-11-24 00:33:43 +00:00
defp extract_backlog(key, backlog, added_ids) do
case Floof.SessionManager.pop(key) do
2022-11-24 19:01:34 +00:00
:empty ->
{backlog, added_ids}
{:value, {mhash, entry}} ->
extract_backlog(key, Map.put(backlog, mhash, entry), MapSet.put(added_ids, mhash))
2022-11-24 00:33:43 +00:00
end
end
2022-11-24 09:41:41 +00:00
defp send_summary(client, direction, dcdids) do
2022-11-25 00:50:11 +00:00
if not Enum.empty?(dcdids) do
dat = {:Summary, direction, Enum.to_list(dcdids)}
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:gen_tcp.send(client, encd)
else
:ok
end
2022-11-24 09:41:41 +00:00
end
defp partition_hashes(xs) do
xs
|> Floof.Distributor.have_we_multi()
|> Enum.to_list()
|> partition_list(MapSet.new(), MapSet.new())
end
defp partition_list([], lo, hi) do
{lo, hi}
end
defp partition_list([{key, cnd} | xs], lo, hi) do
{lo2, hi2} =
if cnd do
{lo, MapSet.put(hi, key)}
else
{MapSet.put(lo, key), hi}
end
partition_list(xs, lo2, hi2)
end
2022-11-22 15:41:59 +00:00
end