floof/lib/floof.ex

102 lines
3.2 KiB
Elixir
Raw Normal View History

2022-11-22 15:41:59 +00:00
defmodule Floof do
@moduledoc """
Documentation for `Floof`.
"""
require Logger
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port, [
:binary,
packet: 4,
active: false,
reuseaddr: true
])
Logger.info("Accepting connections on #{port}")
loop_acceptor(socket)
end
2022-11-22 23:17:46 +00:00
defp loop_acceptor(socket) do
2022-11-22 15:41:59 +00:00
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(
Floof.TaskSupervisor, fn -> serve(client, %{}) end)
2022-11-22 15:41:59 +00:00
:ok = :gen_tcp.controlling_process(client, pid)
2022-11-22 23:17:46 +00:00
# switch to active now that we handed over the client
:inet.setopts(client, [{:active, true}])
send(Floof.Distributor, {:register, pid})
2022-11-22 15:41:59 +00:00
loop_acceptor(socket)
end
2022-11-22 23:17:46 +00:00
defp serve(client, backlog) do
receive do
{:tcp, _, message} ->
{:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message)
2022-11-24 00:33:43 +00:00
backlog = handle_incoming(client, backlog, dcd)
serve(client, backlog)
2022-11-24 08:46:49 +00:00
{:fwdxfer, {dcdid, dcd}} ->
2022-11-22 23:17:46 +00:00
handle_outgoing(client, dcdid)
serve(client, Map.put(backlog, dcdid, dcd))
2022-11-22 23:17:46 +00:00
{:havewe, id, res} ->
dat = {:Summary, if res do :rejectpush else :pull end, id}
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:ok = client.send(encd)
serve(client, backlog)
2022-11-24 00:33:43 +00:00
{:SessionPushed, key} ->
{backlog, added_ids} = extract_backlog(key, backlog, MapSet.new())
2022-11-24 00:33:43 +00:00
for id <- added_ids do
dat = {:Summary, :requestpull, id}
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:ok = client.send(encd)
end
serve(client, backlog)
2022-11-22 23:17:46 +00:00
x -> Logger.warn("unable to handle request: #{inspect(x)}")
serve(client, backlog)
end
end
defp handle_incoming(client, backlog, dcd) do
Logger.debug("got packet #{inspect(dcd)}")
case dcd do
2022-11-24 00:33:43 +00:00
{:session, {:SessionModify, {:attach, key}}} ->
if Floof.SessionManager.attach(key, self()) do
send self(), {:SessionPushed, key}
end
2022-11-24 08:46:49 +00:00
backlog
2022-11-22 23:17:46 +00:00
{:summary, {:Summary, direction, id}} ->
case direction do
:requestpull ->
2022-11-24 00:33:43 +00:00
send Floof.Distributor, {:havewe, self(), id}
2022-11-22 23:17:46 +00:00
backlog
:pull ->
{item, backlog2} = Map.pop(backlog, id)
if item != nil do
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, {id, item}})
:ok = client.send(encd)
2022-11-22 23:17:46 +00:00
end
backlog2
2022-11-22 23:17:46 +00:00
:rejectpush ->
Map.delete(backlog, id)
2022-11-22 23:17:46 +00:00
end
{:xfer, xf} ->
2022-11-24 08:46:49 +00:00
{:XferBlob, _, _, _, _, _, xfinner} = xf
{:ok, {:XferInner, id, _, _, _, _}} = :FloofProtocol.decode(:XferInner, xfinner)
send Floof.Distributor, {:xfer, {id, xf}}
2022-11-22 23:17:46 +00:00
backlog
end
end
defp handle_outgoing(client, dcdid) do
dat = {:Summary, :requestpull, dcdid}
{:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat})
:ok = client.send(encd)
end
2022-11-24 00:33:43 +00:00
defp extract_backlog(key, backlog, added_ids) do
case Floof.SessionManager.pop(key) do
:empty -> {backlog, added_ids}
{:value, {id, entry}} ->
extract_backlog(key, Map.put(backlog, id, entry), MapSet.put(added_ids, id))
2022-11-24 00:33:43 +00:00
end
end
2022-11-22 15:41:59 +00:00
end