Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to control the dequeue #421

Merged
merged 9 commits into from
Aug 21, 2020
Prev Previous commit
Next Next commit
add documentation
  • Loading branch information
ananthakumaran committed Jul 28, 2020
commit 697fea93c9280ad802100a1e7d45c4cfa2fd305e
35 changes: 34 additions & 1 deletion lib/exq/dequeue/behaviour.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
defmodule Exq.Dequeue.Behaviour do
@callback init(info :: map, args :: term) :: {:ok, term}
@moduledoc """
Custom concurreny or rate limiting at a queue level can be achieved
by implementing the Dequeue behaviour

The following config can be used to customize dequeue behaviour for a queue

config :exq,
queues: [{"default", {RateLimiter, options}}]

RateLimiter module should implement `Exq.Dequeue.Behaviour`. The
options supplied here would be passed as the second argument to the
`c:init/2` function.

### Life cycle

`c:init/2` will be invoked on initialization. The first argument will contain info
like queue and the second argument is user configurable.

`c:available?/1` will be invoked before each poll. If the
returned value contains `false` as the second element of the tuple,
the queue will not polled

`c:dispatched/1` will be invoked once a job is dispatched to the worker

`c:processed/1` will be invoked if a job completed successfully

`c:failed/1` will be invoked if a job failed

`c:stop/1` will be invoked when a queue is unsubscribed or before the
node terminates. Note: there is no guarantee this will be invoked if
the node terminates abruptly
"""

@callback init(info :: %{queue: String.t()}, args :: term) :: {:ok, term}
@callback stop(state :: term) :: :ok
@callback available?(state :: term) :: {:ok, boolean, term}
@callback dispatched(state :: term) :: {:ok, term}
Expand Down
2 changes: 2 additions & 0 deletions lib/exq/dequeue/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule Exq.Dequeue.Local do
@behaviour Exq.Dequeue.Behaviour

defmodule State do
@moduledoc false

defstruct max: nil, current: 0
end

Expand Down