Skip to content

Commit

Permalink
Merge pull request akira#331 from LysanderGG/system_config
Browse files Browse the repository at this point in the history
Support load from runtime env for more config params
  • Loading branch information
akira committed Oct 14, 2018
2 parents db8a5c7 + 76ae9ae commit b955550
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 11 deletions.
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ config :exq,
poll_timeout: 100,
redis_timeout: 5000,
genserver_timeout: 5000,
shutdown_timeout: 5000,
reconnect_on_sleep: 100,
dead_max_jobs: 10_000,
dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months
Expand Down
10 changes: 8 additions & 2 deletions lib/exq/redis/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ defmodule Exq.Redis.JobQueue do
retry_or_fail_job(redis, namespace, job, error, retry)
end
def retry_or_fail_job(redis, namespace, %{retry: true} = job, error) do
retry_or_fail_job(redis, namespace, job, error, Config.get(:max_retries))
retry_or_fail_job(redis, namespace, job, error, get_max_retries())
end
def retry_or_fail_job(redis, namespace, job, error) do
fail_job(redis, namespace, job, error)
Expand Down Expand Up @@ -373,8 +373,14 @@ defmodule Exq.Redis.JobQueue do
end
def to_job_serialized(queue, worker, args, options, enqueued_at) do
jid = UUID.uuid4
retry = Keyword.get_lazy(options, :max_retries, fn() -> Config.get(:max_retries) end)
retry = Keyword.get_lazy(options, :max_retries, fn() -> get_max_retries() end)
job = %{queue: queue, retry: retry, class: worker, args: args, jid: jid, enqueued_at: enqueued_at}
{jid, Config.serializer.encode!(job)}
end

defp get_max_retries do
:max_retries
|> Config.get()
|> Exq.Support.Coercion.to_integer()
end
end
17 changes: 17 additions & 0 deletions lib/exq/support/coercion.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,21 @@ defmodule Exq.Support.Coercion do
raise ArgumentError,
message: "Failed to parse #{inspect(value)} into an integer."
end

def to_boolean(value) when is_boolean(value) do
value
end

@true_values ["true", "yes", "1"]
def to_boolean(value) when is_binary(value) do
case value |> String.trim() |> String.downcase() do
x when x in @true_values -> true
_ -> false
end
end

def to_boolean(value) do
raise ArgumentError,
message: "Failed to parse #{inspect(value)} into a boolean."
end
end
28 changes: 20 additions & 8 deletions lib/exq/support/opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ defmodule Exq.Support.Opts do
end

def connection_opts(opts \\ []) do
reconnect_on_sleep = opts[:reconnect_on_sleep] || Config.get(:reconnect_on_sleep)
timeout = opts[:redis_timeout] || Config.get(:redis_timeout)
reconnect_on_sleep = Coercion.to_integer(opts[:reconnect_on_sleep] || Config.get(:reconnect_on_sleep))
timeout = Coercion.to_integer(opts[:redis_timeout] || Config.get(:redis_timeout))
socket_opts = opts[:socket_opts] || Config.get(:socket_opts) || []

[backoff: reconnect_on_sleep, timeout: timeout, name: opts[:redis], socket_opts: socket_opts]
end

defp server_opts(:default, opts) do
scheduler_enable = opts[:scheduler_enable] || Config.get(:scheduler_enable)
scheduler_enable = Coercion.to_boolean(opts[:scheduler_enable] || Config.get(:scheduler_enable))
namespace = opts[:namespace] || Config.get(:namespace)
scheduler_poll_timeout = opts[:scheduler_poll_timeout] || Config.get(:scheduler_poll_timeout)
poll_timeout = opts[:poll_timeout] || Config.get(:poll_timeout)
shutdown_timeout = opts[:shutdown_timeout] || Config.get(:shutdown_timeout)
scheduler_poll_timeout = Coercion.to_integer(opts[:scheduler_poll_timeout] || Config.get(:scheduler_poll_timeout))
poll_timeout = Coercion.to_integer(opts[:poll_timeout] || Config.get(:poll_timeout))
shutdown_timeout = Coercion.to_integer(opts[:shutdown_timeout] || Config.get(:shutdown_timeout))

enqueuer = Exq.Enqueuer.Server.server_name(opts[:name])
stats = Exq.Stats.Server.server_name(opts[:name])
Expand All @@ -80,13 +80,13 @@ defmodule Exq.Support.Opts do
metadata = Exq.Worker.Metadata.server_name(opts[:name])

queue_configs = opts[:queues] || Config.get(:queues)
per_queue_concurrency = opts[:concurrency] || Config.get(:concurrency)
per_queue_concurrency = opts[:concurrency] || get_config_concurrency()
queues = get_queues(queue_configs)
concurrency = get_concurrency(queue_configs, per_queue_concurrency)
default_middleware = Config.get(:middleware)

[scheduler_enable: scheduler_enable, namespace: namespace,
scheduler_poll_timeout: scheduler_poll_timeout,workers_sup: workers_sup,
scheduler_poll_timeout: scheduler_poll_timeout, workers_sup: workers_sup,
poll_timeout: poll_timeout, enqueuer: enqueuer, metadata: metadata,
stats: stats, name: opts[:name], scheduler: scheduler, queues:
queues, redis: opts[:redis], concurrency: concurrency,
Expand All @@ -107,6 +107,18 @@ defmodule Exq.Support.Opts do
end)
end

defp get_config_concurrency() do
case Config.get(:concurrency) do
x when is_atom(x) -> x
x when is_integer(x) -> x
x when is_binary(x) ->
case x |> String.trim() |> String.downcase() do
"infinity" -> :infinity
x -> Coercion.to_integer(x)
end
end
end

defp get_concurrency(queue_configs, per_queue_concurrency) do
Enum.map(queue_configs, fn (queue_config) ->
case queue_config do
Expand Down
45 changes: 44 additions & 1 deletion test/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,52 @@ defmodule Exq.ConfigTest do
assert mode == :api
end

test "redis_worker_opts from runtime environment" do
System.put_env("EXQ_NAMESPACE", "test")
System.put_env("EXQ_CONCURRENCY", "333")
System.put_env("EXQ_POLL_TIMEOUT", "17")
System.put_env("EXQ_SCHEDULER_POLL_TIMEOUT", "123")
System.put_env("EXQ_SCHEDULER_ENABLE", "True")
System.put_env("EXQ_SHUTDOWN_TIMEOUT", "1234")

Mix.Config.persist([
exq: [
namespace: {:system, "EXQ_NAMESPACE"},
concurrency: {:system, "EXQ_CONCURRENCY"},
poll_timeout: {:system, "EXQ_POLL_TIMEOUT"},
scheduler_poll_timeout: {:system, "EXQ_SCHEDULER_POLL_TIMEOUT"},
scheduler_enable: {:system, "EXQ_SCHEDULER_ENABLE"},
shutdown_timeout: {:system, "EXQ_SHUTDOWN_TIMEOUT"}
]
])

{Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])

assert server_opts[:namespace] == "test"
assert server_opts[:concurrency] == [{"default", 333, 0}]
assert server_opts[:poll_timeout] == 17
assert server_opts[:scheduler_poll_timeout] == 123
assert server_opts[:scheduler_enable] == true
assert server_opts[:shutdown_timeout] == 1234
end

test "redis_worker_opts from runtime environment - concurrency :infinity" do
System.put_env("EXQ_CONCURRENCY", "infinity")

Mix.Config.persist([
exq: [
concurrency: {:system, "EXQ_CONCURRENCY"},
]
])

{Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])

assert server_opts[:concurrency] == [{"default", :infinity, 0}]
end

test "custom redis module" do
with_application_env(:exq, :redis_worker, {RedisWorker, [1, 2]}, fn ->
{module, args, server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])
{module, args, _server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])
assert module == RedisWorker
assert args == [1, 2]
end)
Expand Down
14 changes: 14 additions & 0 deletions test/job_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,18 @@ defmodule JobQueueTest do
assert job.class == "MyWorker/perform"
assert job.retry == 10
end

test "max_retries from runtime environment" do
System.put_env("EXQ_MAX_RETRIES", "3")

Mix.Config.persist([exq: [max_retries: {:system, "EXQ_MAX_RETRIES"}]])

{:ok, jid} = JobQueue.enqueue(:testredis, "test", "default", MyWorker, [], [])
assert jid != nil

[{:ok, {job_str, _}}] = JobQueue.dequeue(:testredis, "test", @host, ["default"])
job = Poison.decode!(job_str, as: %Exq.Support.Job{})
assert job.jid == jid
assert job.retry == 3
end
end

0 comments on commit b955550

Please sign in to comment.