Skip to content

Commit

Permalink
Merge pull request devinus#70 from fishcakez/cancel_racecon
Browse files Browse the repository at this point in the history
Fix checkout timeout race condition
  • Loading branch information
Devin Torres committed Apr 10, 2015
2 parents 42c6317 + 1574805 commit 5c1c58b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 30 deletions.
66 changes: 40 additions & 26 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ checkout(Pool, Block) ->
-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout())
-> pid() | full.
checkout(Pool, Block, Timeout) ->
CRef = make_ref(),
try
gen_server:call(Pool, {checkout, Block}, Timeout)
gen_server:call(Pool, {checkout, CRef, Block}, Timeout)
catch
Class:Reason ->
gen_server:cast(Pool, {cancel_waiting, self()}),
gen_server:cast(Pool, {cancel_waiting, CRef}),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end.

Expand Down Expand Up @@ -146,44 +147,56 @@ init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->

handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
true = erlang:demonitor(Ref),
[{Pid, _, MRef}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
{noreply, State}
end;

handle_cast({cancel_waiting, Pid}, State) ->
Waiting = queue:filter(fun ({{P, _}, Ref}) ->
P =/= Pid orelse not(erlang:demonitor(Ref))
end, State#state.waiting),
{noreply, State#state{waiting = Waiting}};
handle_cast({cancel_waiting, CRef}, State) ->
case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
[[Pid, MRef]] ->
demonitor(MRef, [flush]),
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
demonitor(MRef, [flush]),
false;
(_) ->
true
end,
Waiting = queue:filter(Cancel, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;

handle_cast(_Msg, State) ->
{noreply, State}.

handle_call({checkout, Block}, {FromPid, _} = From, State) ->
handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
#state{supervisor = Sup,
workers = Workers,
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow} = State,
case Workers of
[Pid | Left] ->
Ref = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{workers = Left}};
[] when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, Ref} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, Ref}),
{Pid, MRef} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{overflow = Overflow + 1}};
[] when Block =:= false ->
{reply, full, State};
[] ->
Ref = erlang:monitor(process, FromPid),
Waiting = queue:in({From, Ref}, State#state.waiting),
MRef = erlang:monitor(process, FromPid),
Waiting = queue:in({From, CRef, MRef}, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;

Expand All @@ -201,30 +214,31 @@ handle_call(get_all_workers, _From, State) ->
WorkerList = supervisor:which_children(Sup),
{reply, WorkerList, State};
handle_call(get_all_monitors, _From, State) ->
Monitors = ets:tab2list(State#state.monitors),
Monitors = ets:select(State#state.monitors,
[{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
{reply, Monitors, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.

handle_info({'DOWN', Ref, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', Ref}) of
handle_info({'DOWN', MRef, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
[[Pid]] ->
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
{noreply, NewState};
[] ->
Waiting = queue:filter(fun ({_, R}) -> R =/= Ref end, State#state.waiting),
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, Ref}] ->
true = erlang:demonitor(Ref),
[{Pid, _, MRef}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_worker_exit(Pid, State),
{noreply, NewState};
Expand Down Expand Up @@ -288,8 +302,8 @@ handle_checkin(Pid, State) ->
overflow = Overflow,
strategy = Strategy} = State,
case queue:out(Waiting) of
{{value, {From, Ref}}, Left} ->
true = ets:insert(Monitors, {Pid, Ref}),
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
gen_server:reply(From, Pid),
State#state{waiting = Left};
{empty, Empty} when Overflow > 0 ->
Expand All @@ -308,9 +322,9 @@ handle_worker_exit(Pid, State) ->
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
{{value, {From, Ref}}, LeftWaiting} ->
{{value, {From, CRef, MRef}}, LeftWaiting} ->
NewWorker = new_worker(State#state.supervisor),
true = ets:insert(Monitors, {NewWorker, Ref}),
true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
gen_server:reply(From, NewWorker),
State#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
Expand Down
52 changes: 48 additions & 4 deletions test/poolboy_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ pool_test_() ->
},
{<<"Pool reuses waiting monitor when a worker exits">>,
fun reuses_waiting_monitor_on_worker_exit/0
}
},
{<<"Recover from timeout without exit handling">>,
fun transaction_timeout_without_exit/0},
{<<"Recover from transaction timeout">>,
fun transaction_timeout/0}
]
}.

Expand All @@ -88,6 +92,38 @@ checkin_worker(Pid, Worker) ->
poolboy:checkin(Pid, Worker),
timer:sleep(500).


transaction_timeout_without_exit() ->
{ok, Pid} = new_pool(1, 0),
?assertEqual({ready,1,0,0}, pool_call(Pid, status)),
WorkerList = pool_call(Pid, get_all_workers),
?assertMatch([_], WorkerList),
spawn(poolboy, transaction, [Pid,
fun(Worker) ->
ok = pool_call(Worker, work)
end,
0]),
timer:sleep(100),
?assertEqual(WorkerList, pool_call(Pid, get_all_workers)),
?assertEqual({ready,1,0,0}, pool_call(Pid, status)).


transaction_timeout() ->
{ok, Pid} = new_pool(1, 0),
?assertEqual({ready,1,0,0}, pool_call(Pid, status)),
WorkerList = pool_call(Pid, get_all_workers),
?assertMatch([_], WorkerList),
?assertExit(
{timeout, _},
poolboy:transaction(Pid,
fun(Worker) ->
ok = pool_call(Worker, work)
end,
0)),
?assertEqual(WorkerList, pool_call(Pid, get_all_workers)),
?assertEqual({ready,1,0,0}, pool_call(Pid, status)).


pool_startup() ->
%% Check basic pool operation.
{ok, Pid} = new_pool(10, 5),
Expand Down Expand Up @@ -431,15 +467,16 @@ demonitors_previously_waiting_processes() ->

demonitors_when_checkout_cancelled() ->
{ok, Pool} = new_pool(1,0),
Self = self(),
Pid = spawn(fun() ->
poolboy:checkout(Pool),
poolboy:checkout(Pool),
_ = (catch poolboy:checkout(Pool, true, 1000)),
Self ! ok,
receive ok -> ok end
end),
timer:sleep(500),
?assertEqual(2, length(get_monitors(Pool))),
gen_server:cast(Pool, {cancel_waiting, Pid}),
timer:sleep(500),
receive ok -> ok end,
?assertEqual(1, length(get_monitors(Pool))),
Pid ! ok,
ok = pool_call(Pool, stop).
Expand Down Expand Up @@ -480,14 +517,21 @@ reuses_waiting_monitor_on_worker_exit() ->
end),

Worker = receive {worker, Worker} -> Worker end,
Ref = monitor(process, Worker),
exit(Worker, kill),
receive
{'DOWN', Ref, _, _, _} ->
ok
end,

?assertEqual(1, length(get_monitors(Pool))),

Pid ! ok,
ok = pool_call(Pool, stop).

get_monitors(Pid) ->
%% Synchronise with the Pid to ensure it has handled all expected work.
_ = sys:get_status(Pid),
[{monitors, Monitors}] = erlang:process_info(Pid, [monitors]),
Monitors.

Expand Down

0 comments on commit 5c1c58b

Please sign in to comment.