Skip to content

Commit

Permalink
Merge pull request devinus#111 from zugolosian/workers_to_queue
Browse files Browse the repository at this point in the history
Convert worker list to queue for consistent lifo|fifo performance
  • Loading branch information
Vagabond authored Oct 6, 2018
2 parents 0fe289b + ccc091c commit 9212a87
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 60 deletions.
55 changes: 30 additions & 25 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

-record(state, {
supervisor :: undefined | pid(),
workers = [] :: [pid()],
workers :: undefined | pid_queue(),
waiting :: pid_queue(),
monitors :: ets:tid(),
size = 5 :: non_neg_integer(),
Expand Down Expand Up @@ -205,19 +205,20 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
workers = Workers,
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow} = State,
case Workers of
[Pid | Left] ->
max_overflow = MaxOverflow,
strategy = Strategy} = State,
case get_worker_with_strategy(Workers, Strategy) of
{{value, Pid}, Left} ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{workers = Left}};
[] when MaxOverflow > 0, Overflow < MaxOverflow ->
{empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, MRef} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
{reply, Pid, State#state{overflow = Overflow + 1}};
[] when Block =:= false ->
{empty, _Left} when Block =:= false ->
{reply, full, State};
[] ->
{empty, _Left} ->
MRef = erlang:monitor(process, FromPid),
Waiting = queue:in({From, CRef, MRef}, State#state.waiting),
{noreply, State#state{waiting = Waiting}}
Expand All @@ -228,7 +229,7 @@ handle_call(status, _From, State) ->
monitors = Monitors,
overflow = Overflow} = State,
StateName = state_name(State),
{reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State};
{reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State};
handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
{reply, Workers, State};
Expand Down Expand Up @@ -266,10 +267,10 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
NewState = handle_worker_exit(Pid, State),
{noreply, NewState};
[] ->
case lists:member(Pid, State#state.workers) of
case queue:member(Pid, State#state.workers) of
true ->
W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
{noreply, State#state{workers = [new_worker(Sup) | W]}};
W = filter_worker_by_pid(Pid, State#state.workers),
{noreply, State#state{workers = queue:in(new_worker(Sup), W)}};
false ->
{noreply, State}
end
Expand All @@ -279,7 +280,8 @@ handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, State) ->
ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers),
Workers = queue:to_list(State#state.workers),
ok = lists:foreach(fun (W) -> unlink(W) end, Workers),
true = exit(State#state.supervisor, shutdown),
ok.

Expand All @@ -304,26 +306,33 @@ new_worker(Sup, FromPid) ->
Ref = erlang:monitor(process, FromPid),
{Pid, Ref}.

get_worker_with_strategy(Workers, fifo) ->
queue:out(Workers);
get_worker_with_strategy(Workers, lifo) ->
queue:out_r(Workers).

dismiss_worker(Sup, Pid) ->
true = unlink(Pid),
supervisor:terminate_child(Sup, Pid).

filter_worker_by_pid(Pid, Workers) ->
queue:filter(fun (WPid) -> WPid =/= Pid end, Workers).

prepopulate(N, _Sup) when N < 1 ->
[];
queue:new();
prepopulate(N, Sup) ->
prepopulate(N, Sup, []).
prepopulate(N, Sup, queue:new()).

prepopulate(0, _Sup, Workers) ->
Workers;
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, [new_worker(Sup) | Workers]).
prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).

handle_checkin(Pid, State) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow,
strategy = Strategy} = State,
overflow = Overflow} = State,
case queue:out(Waiting) of
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
Expand All @@ -333,10 +342,7 @@ handle_checkin(Pid, State) ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
{empty, Empty} ->
Workers = case Strategy of
lifo -> [Pid | State#state.workers];
fifo -> State#state.workers ++ [Pid]
end,
Workers = queue:in(Pid, State#state.workers),
State#state{workers = Workers, waiting = Empty, overflow = 0}
end.

Expand All @@ -353,15 +359,14 @@ handle_worker_exit(Pid, State) ->
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
Workers =
[new_worker(Sup)
| lists:filter(fun (P) -> P =/= Pid end, State#state.workers)],
W = filter_worker_by_pid(Pid, State#state.workers),
Workers = queue:in(new_worker(Sup), W),
State#state{workers = Workers, waiting = Empty}
end.

state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
#state{max_overflow = MaxOverflow, workers = Workers} = State,
case length(Workers) == 0 of
case queue:len(Workers) == 0 of
true when MaxOverflow < 1 -> full;
true -> overflow;
false -> ready
Expand Down
70 changes: 35 additions & 35 deletions test/poolboy_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,37 +127,37 @@ transaction_timeout() ->
pool_startup() ->
%% Check basic pool operation.
{ok, Pid} = new_pool(10, 5),
?assertEqual(10, length(pool_call(Pid, get_avail_workers))),
?assertEqual(10, queue:len(pool_call(Pid, get_avail_workers))),
poolboy:checkout(Pid),
?assertEqual(9, length(pool_call(Pid, get_avail_workers))),
?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))),
Worker = poolboy:checkout(Pid),
?assertEqual(8, length(pool_call(Pid, get_avail_workers))),
?assertEqual(8, queue:len(pool_call(Pid, get_avail_workers))),
checkin_worker(Pid, Worker),
?assertEqual(9, length(pool_call(Pid, get_avail_workers))),
?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(1, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).

pool_overflow() ->
%% Check that the pool overflows properly.
{ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(7, length(pool_call(Pid, get_all_workers))),
[A, B, C, D, E, F, G] = Workers,
checkin_worker(Pid, A),
checkin_worker(Pid, B),
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, C),
checkin_worker(Pid, D),
?assertEqual(2, length(pool_call(Pid, get_avail_workers))),
?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, E),
checkin_worker(Pid, F),
?assertEqual(4, length(pool_call(Pid, get_avail_workers))),
?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, G),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(0, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).
Expand All @@ -167,7 +167,7 @@ pool_empty() ->
%% overflow is enabled.
{ok, Pid} = new_pool(5, 2),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(7, length(pool_call(Pid, get_all_workers))),
[A, B, C, D, E, F, G] = Workers,
Self = self(),
Expand All @@ -192,18 +192,18 @@ pool_empty() ->
after
500 -> ?assert(false)
end,
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, C),
checkin_worker(Pid, D),
?assertEqual(2, length(pool_call(Pid, get_avail_workers))),
?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, E),
checkin_worker(Pid, F),
?assertEqual(4, length(pool_call(Pid, get_avail_workers))),
?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, G),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(0, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).
Expand All @@ -213,7 +213,7 @@ pool_empty_no_overflow() ->
%% disabled.
{ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
[A, B, C, D, E] = Workers,
Self = self(),
Expand All @@ -238,14 +238,14 @@ pool_empty_no_overflow() ->
after
500 -> ?assert(false)
end,
?assertEqual(2, length(pool_call(Pid, get_avail_workers))),
?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, C),
checkin_worker(Pid, D),
?assertEqual(4, length(pool_call(Pid, get_avail_workers))),
?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
checkin_worker(Pid, E),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(0, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).
Expand All @@ -256,16 +256,16 @@ worker_death() ->
{ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
[A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(7, length(pool_call(Pid, get_all_workers))),
kill_worker(A),
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(6, length(pool_call(Pid, get_all_workers))),
kill_worker(B),
kill_worker(C),
?assertEqual(1, length(pool_call(Pid, get_avail_workers))),
?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(4, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).
Expand All @@ -277,9 +277,9 @@ worker_death_while_full() ->
{ok, Pid} = new_pool(5, 2),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
[A, B|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(7, length(pool_call(Pid, get_all_workers))),
Self = self(),
spawn(fun() ->
Expand All @@ -306,7 +306,7 @@ worker_death_while_full() ->
1000 -> ?assert(false)
end,
kill_worker(B),
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(6, length(pool_call(Pid, get_all_workers))),
?assertEqual(6, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).
Expand All @@ -318,9 +318,9 @@ worker_death_while_full_no_overflow() ->
{ok, Pid} = new_pool(5, 0),
Worker = poolboy:checkout(Pid),
kill_worker(Worker),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
[A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
Self = self(),
spawn(fun() ->
Expand All @@ -346,10 +346,10 @@ worker_death_while_full_no_overflow() ->
1000 -> ?assert(false)
end,
kill_worker(B),
?assertEqual(1, length(pool_call(Pid, get_avail_workers))),
?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
kill_worker(C),
?assertEqual(2, length(pool_call(Pid, get_avail_workers))),
?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(3, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).
Expand All @@ -359,7 +359,7 @@ pool_full_nonblocking_no_overflow() ->
%% option to use non-blocking checkouts is used.
{ok, Pid} = new_pool(5, 0),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(full, poolboy:checkout(Pid, false)),
?assertEqual(full, poolboy:checkout(Pid, false)),
Expand All @@ -374,7 +374,7 @@ pool_full_nonblocking() ->
%% option to use non-blocking checkouts is used.
{ok, Pid} = new_pool(5, 5),
Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)],
?assertEqual(0, length(pool_call(Pid, get_avail_workers))),
?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(10, length(pool_call(Pid, get_all_workers))),
?assertEqual(full, poolboy:checkout(Pid, false)),
A = hd(Workers),
Expand All @@ -395,17 +395,17 @@ owner_death() ->
receive after 500 -> exit(normal) end
end),
timer:sleep(1000),
?assertEqual(5, length(pool_call(Pid, get_avail_workers))),
?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))),
?assertEqual(5, length(pool_call(Pid, get_all_workers))),
?assertEqual(0, length(pool_call(Pid, get_all_monitors))),
ok = pool_call(Pid, stop).

checkin_after_exception_in_transaction() ->
{ok, Pool} = new_pool(2, 0),
?assertEqual(2, length(pool_call(Pool, get_avail_workers))),
?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))),
Tx = fun(Worker) ->
?assert(is_pid(Worker)),
?assertEqual(1, length(pool_call(Pool, get_avail_workers))),
?assertEqual(1, queue:len(pool_call(Pool, get_avail_workers))),
throw(it_on_the_ground),
?assert(false)
end,
Expand All @@ -414,7 +414,7 @@ checkin_after_exception_in_transaction() ->
catch
throw:it_on_the_ground -> ok
end,
?assertEqual(2, length(pool_call(Pool, get_avail_workers))),
?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))),
ok = pool_call(Pool, stop).

pool_returns_status() ->
Expand Down

0 comments on commit 9212a87

Please sign in to comment.