SessMgr/Distr: improve handling of session hand-over
This commit is contained in:
parent
f4ae49db83
commit
4c5c0cb865
|
@ -106,7 +106,6 @@ defmodule Floof do
|
||||||
|
|
||||||
case dcd do
|
case dcd do
|
||||||
{:session, {:attach, key}} ->
|
{:session, {:attach, key}} ->
|
||||||
Floof.Distributor.register_key(key, self())
|
|
||||||
Floof.SessionManager.attach(key, self())
|
Floof.SessionManager.attach(key, self())
|
||||||
Floof.SessionManager.set_multi(sesskey, backlog)
|
Floof.SessionManager.set_multi(sesskey, backlog)
|
||||||
backlog
|
backlog
|
||||||
|
|
|
@ -17,7 +17,7 @@ defmodule Floof.Distributor do
|
||||||
pubkey_config: %{:keydb => ""}
|
pubkey_config: %{:keydb => ""}
|
||||||
)
|
)
|
||||||
|
|
||||||
Record.defrecord(:dstate, fconf: {}, trgs: MapSet.new(), keys: %{}, seen: nil)
|
Record.defrecord(:dstate, fconf: {}, trgs: MapSet.new(), keys: MapSet.new(), seen: nil)
|
||||||
|
|
||||||
def start_link(initval) do
|
def start_link(initval) do
|
||||||
GenServer.start_link(__MODULE__, initval, name: __MODULE__)
|
GenServer.start_link(__MODULE__, initval, name: __MODULE__)
|
||||||
|
@ -51,32 +51,20 @@ defmodule Floof.Distributor do
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_cast({:register, pid}, state) do
|
def handle_cast({:register, pid}, state) do
|
||||||
Process.link(pid)
|
if :erlang.is_process_alive(pid) do
|
||||||
{:noreply, dstate(state, trgs: MapSet.put(dstate(state, :trgs), pid))}
|
Process.link(pid)
|
||||||
|
{:noreply, dstate(state, trgs: MapSet.put(dstate(state, :trgs), pid))}
|
||||||
|
else
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_cast({:registerKey, key, pid}, state) do
|
def handle_cast({:registerKey, key, pid}, state) do
|
||||||
trgs = dstate(state, :trgs)
|
trgs = dstate(state, :trgs)
|
||||||
keys = dstate(state, :keys)
|
keys = dstate(state, :keys)
|
||||||
|
Process.unlink(pid)
|
||||||
trgs =
|
{:noreply, dstate(state, trgs: MapSet.delete(trgs, pid), keys: MapSet.put(keys, key))}
|
||||||
case Map.get(keys, key) do
|
|
||||||
nil ->
|
|
||||||
trgs
|
|
||||||
|
|
||||||
oldpid ->
|
|
||||||
if :erlang.is_process_alive(oldpid) do
|
|
||||||
# the processes in trgs are always linked, those in keys aren't
|
|
||||||
Process.link(oldpid)
|
|
||||||
Process.unlink(pid)
|
|
||||||
MapSet.put(trgs, oldpid)
|
|
||||||
else
|
|
||||||
trgs
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
{:noreply, dstate(state, trgs: MapSet.delete(trgs, pid), keys: Map.put(keys, key, pid))}
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
|
|
|
@ -6,8 +6,11 @@ defmodule Floof.SessionManager do
|
||||||
end
|
end
|
||||||
|
|
||||||
def attach(key, pid) do
|
def attach(key, pid) do
|
||||||
|
Floof.Distributor.register_key(key, pid)
|
||||||
|
|
||||||
modify_subscription(key, fn oldpid, m ->
|
modify_subscription(key, fn oldpid, m ->
|
||||||
if oldpid != nil and oldpid != pid do
|
if oldpid != nil and oldpid != pid do
|
||||||
|
Floof.Distributor.register(oldpid)
|
||||||
send(oldpid, {:SessionDetached, key})
|
send(oldpid, {:SessionDetached, key})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue