defmodule Floof do @moduledoc """ Documentation for `Floof`. """ require Logger def accept(port, opts \\ []) do {:ok, socket} = :gen_tcp.listen( port, [ :binary, packet: 4, active: false, reuseaddr: true ] ++ opts ) try do Logger.info("Accepting connections on #{port}") loop_acceptor(socket) after :ok = :gen_tcp.close(socket) end end defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child(Floof.TaskSupervisor, fn -> try do serve(client, %{}, nil) after :ok = :gen_tcp.close(client) end end) :ok = :gen_tcp.controlling_process(client, pid) # switch to active now that we handed over the client :inet.setopts(client, [{:active, true}]) Floof.Distributor.register(pid) loop_acceptor(socket) end def connect(host, port, sesskey \\ nil) do {:ok, socket} = :gen_tcp.connect(host, port, packet: 4, active: true ) try do Floof.Distributor.register(self()) Logger.info("Established connection to #{inspect(host)}:#{port}") if sesskey != nil do {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:session, {:attach, sesskey}}) :ok = :gen_tcp.send(socket, encd) Floof.SessionManager.attach(sesskey, self()) end serve(socket, %{}, sesskey) after :ok = :gen_tcp.close(socket) end end defp serve(client, backlog, sesskey) do {backlog, sesskey} = receive do {:tcp, _, message} -> message = if :erlang.is_binary(message) do message else :erlang.list_to_binary(message) end {:ok, dcd} = :FloofProtocol.decode(:ProtoMessage, message) handle_incoming(client, backlog, sesskey, dcd) {:fwdxfer, {dcdhash, dcd}} -> backlog = if sesskey != nil do # make sure that stuff gets handled uniformly Floof.SessionManager.set_soft_multi(sesskey, [dcdhash]) Floof.PacketSpool.store_new(dcdhash, dcd) else :ok = send_summary(client, :requestpull, [dcdhash]) Map.put(backlog, dcdhash, dcd) end {backlog, sesskey} {:fwddrop, dcdhash} -> to_drop = coalesce_fwddrops(MapSet.new([dcdhash])) :ok = send_summary(client, :drop, to_drop) {backlog, sesskey} {:SessionPushed, sesskey} -> sesskeys = coalesce_sesspushes([sesskey]) [sesskey | _] = sesskeys dcdids = Enum.map(MapSet.new(sesskeys), &MapSet.new(Floof.SessionManager.peek(&1))) dcdids = Enum.reduce(dcdids, &MapSet.union(&1, &2)) Logger.debug("presenting session content #{inspect(dcdids)})") :ok = send_summary(client, :requestpull, dcdids) {backlog, sesskey} {:SessionDetached, old_sesskey} -> sesskey2 = if sesskey == old_sesskey do nil else sesskey end {backlog, sesskey2} {:tcp_closed, _} -> # put all the backlogged stuff back if possible if sesskey != nil do transfer_backlog_to_session(sesskey, backlog) Floof.SessionManager.detach(sesskey, self()) end exit(:normal) x -> Logger.warn("unable to handle request: #{inspect(x)}") {backlog, sesskey} end serve(client, backlog, sesskey) end defp handle_incoming(client, backlog, sesskey, dcd) do Logger.debug("got packet #{inspect(dcd)}") case dcd do {:session, {:attach, sesskey_new}} -> if sesskey != nil do Floof.SessionManager.detach(sesskey, self()) end Floof.SessionManager.attach(sesskey_new, self()) transfer_backlog_to_session(sesskey_new, backlog) {%{}, sesskey_new} {:summary, {:Summary, direction, hashes}} -> case direction do :requestpull -> {to_pull, to_drop} = partition_hashes(hashes) :ok = send_summary(client, :pull, to_pull) :ok = send_summary(client, :drop, to_drop) {backlog, sesskey} :pull -> for mhash <- hashes do mbitem = case sesskey do nil -> Map.fetch(backlog, mhash) _ -> Floof.PacketSpool.fetch(mhash) end case mbitem do {:ok, item} -> {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:xfer, item}) :ok = :gen_tcp.send(client, encd) _ -> Logger.warn("packet #{Base.url_encode64(mhash)} gone missing") nil end end {backlog, sesskey} :drop -> if sesskey != nil do Floof.SessionManager.drop(sesskey, hashes) end {Map.drop(backlog, hashes), sesskey} end {:xfer, xf} -> mhash = Floof.Message.emit(xf) # the following indirection is necessary # to reduce the amount of 'drop' notifications send(self(), {:fwddrop, mhash}) {backlog, sesskey} end end defp send_summary(client, direction, dcdids) do if not Enum.empty?(dcdids) do dat = {:Summary, direction, Enum.to_list(dcdids)} {:ok, encd} = :FloofProtocol.encode(:ProtoMessage, {:summary, dat}) :gen_tcp.send(client, encd) else :ok end end defp transfer_backlog_to_session(sesskey, backlog) do Floof.SessionManager.set_soft_multi(sesskey, for({dcdhash, _} <- backlog, do: dcdhash)) :ok = Floof.PacketSpool.store_new_multi(backlog) end defp coalesce_sesspushes(sesskeys) do receive do {:SessionPushed, sesskey} -> coalesce_sesspushes([sesskey | sesskeys]) after 10 -> sesskeys end end defp coalesce_fwddrops(to_drop) do receive do {:fwddrop, dcdhash} -> coalesce_fwddrops(MapSet.put(to_drop, dcdhash)) after 10 -> to_drop end end defp partition_hashes(xs) do xs |> Floof.DistributorSeen.have_we_multi() |> Enum.to_list() |> partition_list(MapSet.new(), MapSet.new()) end defp partition_list([], lo, hi) do {lo, hi} end defp partition_list([{key, cnd} | xs], lo, hi) do {lo2, hi2} = if cnd do {lo, MapSet.put(hi, key)} else {MapSet.put(lo, key), hi} end partition_list(xs, lo2, hi2) end end