Skip to content

Commit

Permalink
Merge pull request #13 from rabbitmq/emit_state_on_register
Browse files Browse the repository at this point in the history
Emit current node state on register
  • Loading branch information
kjnilsson committed Dec 10, 2018
2 parents 8c3c362 + 634f9a0 commit 3274068
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 8 deletions.
22 changes: 19 additions & 3 deletions src/aten_detector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,27 @@ handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.

handle_cast({register, Node, Pid}, #state{watchers = Watchers0} = State) ->
handle_cast({register, Node, Pid},
#state{watchers = Watchers0,
node_states = Curr,
threshold = Thresh} = State) ->
Pids0 = maps:get(Node, Watchers0, #{}),
Pids = case Pids0 of
#{Pid := _Mon} -> Pids0;
#{} -> Pids0#{Pid => erlang:monitor(process, Pid)}
#{Pid := _Mon} ->
Pids0;
#{} ->
%% this is a new registration, emit the current state
case Curr of
#{Node := Last} when Last < Thresh ->
%% the node is known and active
Pid ! {node_event, Node, up},
ok;
_ ->
%% otherwise it must be down
Pid ! {node_event, Node, down},
ok
end,
Pids0#{Pid => erlang:monitor(process, Pid)}
end,
Watchers = maps:put(Node, Pids, Watchers0),
{noreply, State#state{watchers = Watchers}};
Expand Down
83 changes: 78 additions & 5 deletions test/aten_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ all_tests() ->
detect_node_partition,
detect_node_stop_start,
unregister_does_not_detect,
register_unknown_emits_down,
register_detects_down,
watchers_cleanup
].

Expand Down Expand Up @@ -49,14 +51,19 @@ end_per_testcase(_Case, _Config) ->
ok.

detect_node_partition(_Config) ->
S1 = make_node_name(s1),
{ok, S1} = start_slave(s1),
S1 = make_node_name(?FUNCTION_NAME),
ok = aten:register(S1),
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
{ok, S1} = start_slave(?FUNCTION_NAME),
ct:pal("Node ~w Nodes ~w", [node(), nodes()]),
receive
{node_event, S1, up} -> ok
after 5000 ->
exit(node_event_timeout)
exit(node_event_timeout)
end,
%% give it enough time to generate more than one sample
timer:sleep(1000),
Expand All @@ -65,14 +72,16 @@ detect_node_partition(_Config) ->
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
flush(),
exit(node_event_timeout)
end,
meck:unload(aten_sink),

receive
{node_event, S1, up} -> ok
after 5000 ->
exit(node_event_timeout)
flush(),
exit(node_event_timeout)
end,
ok = slave:stop(S1),
ok = aten:unregister(S1),
Expand Down Expand Up @@ -112,6 +121,11 @@ detect_node_stop_start(_Config) ->
unregister_does_not_detect(_Config) ->
S1 = make_node_name(s1),
ok = aten:register(S1),
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
{ok, S1} = start_slave(s1),
ct:pal("Node ~w Nodes ~w", [node(), nodes()]),
receive
Expand All @@ -128,11 +142,61 @@ unregister_does_not_detect(_Config) ->
end,
ok.

register_unknown_emits_down(_Config) ->
S1 = make_node_name(disconnected_node),
ok = aten:register(S1),
% {ok, S1} = start_slave(s1),
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
ok = aten:unregister(S1),
ok.

register_detects_down(_Config) ->
S1 = make_node_name(s1),
ok = aten:register(S1),
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
{ok, S1} = start_slave(s1),
timer:sleep(500),
simulate_partition(S1),
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
ok = aten:unregister(S1),
%% re-register should detect down
ok = aten:register(S1),
receive
{node_event, S1, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
ok = aten:unregister(S1),
ok.

watchers_cleanup(_Config) ->
Node = make_node_name(s1),
Self = self(),
Watcher = spawn_watcher(Node, Self),
ok = aten:register(Node),
%% first clear out all the initial notifications
receive
{watcher_node_down, Node} -> ok
after 5000 ->
exit(node_event_timeout)
end,
receive
{node_event, Node, down} -> ok
after 5000 ->
exit(node_event_timeout)
end,
{ok, Node} = start_slave(s1),
ct:pal("Node ~w Nodes ~w", [node(), nodes()]),
receive
Expand Down Expand Up @@ -228,3 +292,12 @@ start_slave(N) ->
after_char(_, []) -> [];
after_char(Char, [Char|Rest]) -> Rest;
after_char(Char, [_|Rest]) -> after_char(Char, Rest).


flush() ->
receive M ->
ct:pal("flushed ~w~n", [M]),
flush()
after 100 ->
ok
end.

0 comments on commit 3274068

Please sign in to comment.