108 lines
3.3 KiB
Elixir
108 lines
3.3 KiB
Elixir
defmodule Floof do
|
|
@moduledoc """
|
|
Documentation for `Floof`.
|
|
"""
|
|
|
|
require Logger
|
|
|
|
def accept(port) do
|
|
{:ok, socket} = :gen_tcp.listen(port, [
|
|
:binary,
|
|
packet: 4,
|
|
active: false,
|
|
reuseaddr: true
|
|
])
|
|
Logger.info("Accepting connections on #{port}")
|
|
loop_acceptor(socket)
|
|
:ok = :gen_tcp.close(socket)
|
|
end
|
|
|
|
defp loop_acceptor(socket) do
|
|
{:ok, client} = :gen_tcp.accept(socket)
|
|
{:ok, pid} = Task.Supervisor.start_child(Floof.TaskSupervisor, fn ->
|
|
serve(client, %{})
|
|
:ok = :gen_tcp.close(client)
|
|
end)
|
|
:ok = :gen_tcp.controlling_process(client, pid)
|
|
# switch to active now that we handed over the client
|
|
:inet.setopts(client, [{:active, true}])
|
|
Floof.Distributor.register(pid)
|
|
loop_acceptor(socket)
|
|
end
|
|
|
|
def connect(host, port) do
|
|
{:ok, socket} = :gen_tcp.connect(host, port, [
|
|
packet: 4,
|
|
active: true
|
|
])
|
|
Floof.Distributor.register(self())
|
|
Logger.info("Established connection to #{host}:#{port}")
|
|
serve(socket, %{})
|
|
:ok = :gen_tcp.close(socket)
|
|
end
|
|
|
|
defp serve(client, backlog) do
|
|
receive do
|
|
{:tcp, _, message} ->
|
|
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
|
|
backlog = handle_incoming(client, backlog, dcd)
|
|
serve(client, backlog)
|
|
{:fwdxfer, {dcdid, dcd}} ->
|
|
send_summary(client, :requestpull, dcdid)
|
|
serve(client, Map.put(backlog, dcdid, dcd))
|
|
{:SessionPushed, key} ->
|
|
Floof.Distributor.register_key(key, self())
|
|
{backlog, added_ids} = extract_backlog(key, backlog, MapSet.new())
|
|
for id <- added_ids, do: send_summary(client, :requestpull, id)
|
|
serve(client, backlog)
|
|
x -> Logger.warn("unable to handle request: #{inspect(x)}")
|
|
serve(client, backlog)
|
|
end
|
|
end
|
|
|
|
defp handle_incoming(client, backlog, dcd) do
|
|
Logger.debug("got packet #{inspect(dcd)}")
|
|
case dcd do
|
|
{:session, {:SessionModify, {:attach, key}}} ->
|
|
if Floof.SessionManager.attach(key, self()) do
|
|
send self(), {:SessionPushed, key}
|
|
end
|
|
backlog
|
|
{:summary, {:Summary, direction, id}} ->
|
|
case direction do
|
|
:requestpull ->
|
|
send_summary(client, if Floof.Distributor.have_we(id) do :drop else :pull end, id)
|
|
backlog
|
|
:pull ->
|
|
case Map.fetch(backlog, id) do
|
|
{:ok, item} ->
|
|
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item})
|
|
:ok = :gen_tcp.send(client, encd)
|
|
:error -> nil
|
|
end
|
|
backlog
|
|
:drop ->
|
|
Map.delete(backlog, id)
|
|
end
|
|
{:xfer, xf} ->
|
|
id = Floof.Message.emit(xf)
|
|
# notify the remote end that we successfully consumed the message
|
|
send_summary(client, :drop, id)
|
|
backlog
|
|
end
|
|
end
|
|
|
|
defp extract_backlog(key, backlog, added_ids) do
|
|
case Floof.SessionManager.pop(key) do
|
|
:empty -> {backlog, added_ids}
|
|
{:value, {id, entry}} ->
|
|
extract_backlog(key, Map.put(backlog, id, entry), MapSet.put(added_ids, id))
|
|
end
|
|
end
|
|
|
|
defp send_summary(client, direction, dcdid) do
|
|
dat = {:Summary, direction, dcdid}
|
|
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
|
|
:ok = :gen_tcp.send(client, encd)
|
|
end
|
|
end
|