Skip to content

Commit

Permalink
Better handling if Redis is down - bubble up errors
Browse files Browse the repository at this point in the history
* Catch and handle exceptions related to Redis
* Take out this handling from JobQueue and bubble results up
* Manager back off in case of any Redis errors
* Enqueue to return errors in case of Redis issues

To be done:
* Long term plan is to break out each queue fetcher to a separate
GenServer so we can use BRPOPLPUSH, so this logic would be moved there.
  • Loading branch information
akira committed Oct 29, 2015
1 parent 6eb4b72 commit ce84e3d
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 63 deletions.
24 changes: 12 additions & 12 deletions lib/exq/enqueuer/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,36 @@ defmodule Exq.Enqueuer.Server do
end

def handle_cast({:enqueue, from, queue, worker, args}, state) do
jid = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args)
GenServer.reply(from, {:ok, jid})
response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args)
GenServer.reply(from, response)
{:noreply, state}
end

def handle_cast({:enqueue_at, from, queue, time, worker, args}, state) do
jid = JobQueue.enqueue_at(state.redis, state.namespace, queue, time, worker, args)
GenServer.reply(from, {:ok, jid})
response = JobQueue.enqueue_at(state.redis, state.namespace, queue, time, worker, args)
GenServer.reply(from, response)
{:noreply, state}
end

def handle_cast({:enqueue_in, from, queue, offset, worker, args}, state) do
jid = JobQueue.enqueue_in(state.redis, state.namespace, queue, offset, worker, args)
GenServer.reply(from, {:ok, jid})
response = JobQueue.enqueue_in(state.redis, state.namespace, queue, offset, worker, args)
GenServer.reply(from, response)
{:noreply, state}
end

def handle_call({:enqueue, queue, worker, args}, _from, state) do
jid = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args)
{:reply, {:ok, jid}, state}
response = JobQueue.enqueue(state.redis, state.namespace, queue, worker, args)
{:reply, response, state}
end

def handle_call({:enqueue_at, queue, time, worker, args}, _from, state) do
jid = JobQueue.enqueue_at(state.redis, state.namespace, queue, time, worker, args)
{:reply, {:ok, jid}, state}
response = JobQueue.enqueue_at(state.redis, state.namespace, queue, time, worker, args)
{:reply, response, state}
end

def handle_call({:enqueue_in, queue, offset, worker, args}, _from, state) do
jid = JobQueue.enqueue_in(state.redis, state.namespace, queue, offset, worker, args)
{:reply, {:ok, jid}, state}
response = JobQueue.enqueue_in(state.redis, state.namespace, queue, offset, worker, args)
{:reply, response, state}
end

def handle_call({:stop}, _from, state) do
Expand Down
22 changes: 15 additions & 7 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,17 @@ defmodule Exq.Manager.Server do
Enqueuer.enqueue_in(state.enqueuer, from, queue, offset, worker, args)
{:noreply, state, 10}
end

def handle_call({:subscribe, queue}, from, state) do
updated_state = add_queue(state, queue)
{:reply, :ok, updated_state,0}
end

def handle_call({:subscribe, queue, concurrency}, from, state) do
updated_state = add_queue(state, queue, concurrency)
{:reply, :ok, updated_state,0}
end

def handle_call({:unsubscribe, queue}, from, state) do
updated_state = remove_queue(state, queue)
{:reply, :ok, updated_state,0}
Expand All @@ -139,6 +139,11 @@ defmodule Exq.Manager.Server do
{:noreply, updated_state, timeout}
end

def handle_info(info, state) do
Logger.error("UKNOWN CALL #{Kernel.inspect info}")
{:noreply, state, state.timeout}
end

def code_change(_old_version, state, _extra) do
{:ok, state}
end
Expand All @@ -158,8 +163,11 @@ defmodule Exq.Manager.Server do
def dequeue_and_dispatch(state, queues) do
try do
case Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, queues) do
{:none, _} -> {state, state.poll_timeout}
{job, queue} -> {dispatch_job(state, job, queue), 0}
{:ok, {:none, _}} -> {state, state.poll_timeout}
{:ok, {job, queue}} -> {dispatch_job(state, job, queue), 0}
{status, reason} ->
Logger.error("Redis Error #{Kernel.inspect(status)} #{Kernel.inspect(reason)}. Backing off...")
{state, state.poll_timeout * 10}
end
catch
:exit, e ->
Expand Down Expand Up @@ -202,14 +210,14 @@ defmodule Exq.Manager.Server do
end)
{queues, work_table}
end

defp add_queue(state, queue, concurrency \\ Config.get(:concurrency, 10_000)) do
queue_concurrency = {queue, concurrency, 0}
:ets.insert(state.work_table, queue_concurrency)
updated_queues = [queue | state.queues]
%{state | queues: updated_queues}
end

defp remove_queue(state, queue) do
:ets.delete(state.work_table, queue)
updated_queues = List.delete(state.queues, queue)
Expand Down
12 changes: 7 additions & 5 deletions lib/exq/redis/connection.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Exq.Redis.Connection do
require Logger

alias Exq.Support.Config

Expand Down Expand Up @@ -101,11 +102,12 @@ defmodule Exq.Redis.Connection do
res
end

def lpop!(redis, key) do
case q(redis, ["LPOP", key]) do
{:ok, :undefined} -> :none
{:ok, value} -> value
end
def lpop(redis, key) do
q(redis, ["LPOP", key])
end

def zadd(redis, set, score, member) do
q(redis, ["ZADD", set, score, member])
end

def zadd!(redis, set, score, member) do
Expand Down
47 changes: 32 additions & 15 deletions lib/exq/redis/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,29 @@ defmodule Exq.Redis.JobQueue do

def enqueue(redis, namespace, queue, worker, args) do
{jid, job_json} = to_job_json(queue, worker, args)
enqueue(redis, namespace, queue, job_json)
jid
case enqueue(redis, namespace, queue, job_json) do
:ok -> {:ok, jid}
other -> other
end
end
def enqueue(redis, namespace, job_json) do
job = Json.decode!(job_json)
enqueue(redis, namespace, job["queue"], job_json)
job["jid"]
case enqueue(redis, namespace, job["queue"], job_json) do
:ok -> {:ok, job["jid"]}
error -> error
end

end
def enqueue(redis, namespace, queue, job_json) do
try do
[{:ok, _}, {:ok, _}] = :eredis.qp(redis, [
response = :eredis.qp(redis, [
["SADD", full_key(namespace, "queues"), queue],
["RPUSH", queue_key(namespace, queue), job_json]], Config.get(:redis_timeout, 5000))

case response do
[{:ok, _}, {:ok, _}] -> :ok
other -> other
end
catch
:exit, e ->
Logger.info("Error enqueueing - #{Kernel.inspect e}")
Expand All @@ -63,12 +73,14 @@ defmodule Exq.Redis.JobQueue do
end

def enqueue_at(redis, namespace, queue, time, worker, args) do
enqueued_at = DateFormat.format!(Date.from(time, :timestamp) |> Date.local, "{ISO}")
{jid, job_json} = to_job_json(queue, worker, args, enqueued_at)
score = time_to_score(time)
try do
enqueued_at = DateFormat.format!(Date.from(time, :timestamp) |> Date.local, "{ISO}")
{jid, job_json} = to_job_json(queue, worker, args, enqueued_at)
score = time_to_score(time)
Connection.zadd!(redis, scheduled_queue_key(namespace), score, job_json)
jid
case Connection.zadd(redis, scheduled_queue_key(namespace), score, job_json) do
{:ok, _} -> {:ok, jid}
other -> other
end
catch
:exit, e ->
Logger.info("Error enqueueing - #{Kernel.inspect e}")
Expand All @@ -80,7 +92,12 @@ defmodule Exq.Redis.JobQueue do
dequeue_random(redis, namespace, queues)
end
def dequeue(redis, namespace, queue) do
{Connection.lpop!(redis, queue_key(namespace, queue)), queue}
# normalize empty return values
case Connection.lpop(redis, queue_key(namespace, queue)) do
{status, :undefined} -> {status, {:none, queue}}
{status, nil} -> {status, {:none, queue}}
{status, value} -> {status, {value, queue}}
end
end

def scheduler_dequeue(redis, namespace, queues) when is_list(queues) do
Expand Down Expand Up @@ -124,14 +141,14 @@ defmodule Exq.Redis.JobQueue do
end

defp dequeue_random(_redis, _namespace, []) do
{:none, nil}
{:ok, {:none, nil}}
end
defp dequeue_random(redis, namespace, queues) do
[h | rq] = Exq.Support.Shuffle.shuffle(queues)
case dequeue(redis, namespace, h) do
{nil, _} -> dequeue_random(redis, namespace, rq)
{:none, _} -> dequeue_random(redis, namespace, rq)
{job, q} -> {job, q}
{:ok, {:none, _}} -> dequeue_random(redis, namespace, rq)
{:ok, {job, q}} -> {:ok, {job, q}}
{:error, reason} -> {:error, reason}
end
end

Expand Down
61 changes: 61 additions & 0 deletions test/failure_scenarios_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
Code.require_file "test_helper.exs", __DIR__

defmodule FailureScenariosTest do
use ExUnit.Case
use Timex
import ExqTestUtil

defmodule PerformWorker do
def perform do
send :exqtest, {:worked}
end
end

setup do
TestRedis.start
on_exit fn ->
wait
TestRedis.teardown
end
:ok
end

test "handle Redis connection lost on manager" do
{:ok, _} = Exq.start_link([name: :exq_f, port: 6555 ])

# Stop Redis and wait for a bit
TestRedis.stop
# Not ideal - but seems to be min time for manager to die past supervision
:timer.sleep(5000)

# Starting Redis again, things should be back to normal
TestRedis.start
wait_long
assert_exq_up(:exq_f)
Exq.stop(:exq_f)
end

test "handle Redis connection lost on enqueue" do
# Start Exq but don't listen to any queues
{:ok, _} = Exq.start_link([name: :exq_f, port: 6555])

# Stop Redis
TestRedis.stop
wait_long

# enqueue with redis stopped
enq_result = Exq.enqueue(:exq_f, "default", "FakeWorker", [])
assert enq_result == {:error, :no_connection}

enq_result = Exq.enqueue_at(:exq_f, "default", Time.now, ExqTest.PerformWorker, [])
assert enq_result == {:error, :no_connection}

# Starting Redis again and things should be back to normal
wait_long
TestRedis.start
wait_long

assert_exq_up(:exq_f)
Exq.stop(:exq_f)
end
end
2 changes: 2 additions & 0 deletions test/flaky_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule FlakyConnectionTest do
require Logger
import ExqTestUtil

@moduletag :flakey_connection

setup do
TestRedis.setup
on_exit fn ->
Expand Down
51 changes: 30 additions & 21 deletions test/job_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,54 @@ defmodule JobQueueTest do
end
end

def assert_dequeue_job(queues, expect_result) do
result = JobQueue.dequeue(:testredis, "test", queues)
if expect_result do
refute {:ok, {:none, _}} = result
else
assert {:ok, {:none, _}} = result
end
end

test "enqueue/dequeue single queue" do
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [])
{deq, _} = JobQueue.dequeue(:testredis, "test", "default")
{:ok, {deq, _}} = JobQueue.dequeue(:testredis, "test", "default")
assert deq != :none
{deq, _} = JobQueue.dequeue(:testredis, "test", "default")
{:ok, {deq, _}} = JobQueue.dequeue(:testredis, "test", "default")
assert deq == :none
end

test "enqueue/dequeue multi queue" do
JobQueue.enqueue(:testredis, "test", "default", MyWorker, [])
JobQueue.enqueue(:testredis, "test", "myqueue", MyWorker, [])
assert elem(JobQueue.dequeue(:testredis, "test", ["default", "myqueue"]), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", ["default", "myqueue"]), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", ["default", "myqueue"]), 0) == :none
assert_dequeue_job(["default", "myqueue"], true)
assert_dequeue_job(["default", "myqueue"], true)
assert_dequeue_job(["default", "myqueue"], false)
end

test "scheduler_dequeue single queue" do
JobQueue.enqueue_in(:testredis, "test", "default", 0, MyWorker, [])
JobQueue.enqueue_in(:testredis, "test", "default", 0, MyWorker, [])
assert JobQueue.scheduler_dequeue(:testredis, "test", ["default"]) == 2
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) == :none
assert_dequeue_job("default", true)
assert_dequeue_job("default", true)
assert_dequeue_job("default", false)
end

test "scheduler_dequeue multi queue" do
JobQueue.enqueue_in(:testredis, "test", "default", -1, MyWorker, [])
JobQueue.enqueue_in(:testredis, "test", "myqueue", -1, MyWorker, [])
assert JobQueue.scheduler_dequeue(:testredis, "test", ["default", "myqueue"]) == 2
assert elem(JobQueue.dequeue(:testredis, "test", ["default", "myqueue"]), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", ["default", "myqueue"]), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", ["default", "myqueue"]), 0) == :none
assert_dequeue_job(["default", "myqueue"], true)
assert_dequeue_job(["default", "myqueue"], true)
assert_dequeue_job(["default", "myqueue"], false)
end

test "scheduler_dequeue enqueue_at" do
JobQueue.enqueue_at(:testredis, "test", "default", Time.now, MyWorker, [])
assert JobQueue.scheduler_dequeue(:testredis, "test", ["default"]) == 1
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) == :none
assert_dequeue_job("default", true)
assert_dequeue_job("default", false)
end

test "scheduler_dequeue max_score" do
Expand Down Expand Up @@ -83,12 +91,13 @@ defmodule JobQueueTest do
assert Exq.Enqueuer.Server.queue_size(:testredis, "test", "default") == "5"
assert Exq.Enqueuer.Server.queue_size(:testredis, "test", :scheduled) == "0"

assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) != :none
assert elem(JobQueue.dequeue(:testredis, "test", "default"), 0) == :none

assert_dequeue_job("default", true)
assert_dequeue_job("default", true)
assert_dequeue_job("default", true)
assert_dequeue_job("default", true)
assert_dequeue_job("default", true)
assert_dequeue_job("default", false)
end

test "full_key" do
Expand All @@ -98,10 +107,10 @@ defmodule JobQueueTest do
end

test "creates and returns a jid" do
jid = JobQueue.enqueue(:testredis, "test", "default", MyWorker, [])
{:ok, jid} = JobQueue.enqueue(:testredis, "test", "default", MyWorker, [])
assert jid != nil

{job_str, _} = JobQueue.dequeue(:testredis, "test", "default")
{:ok, {job_str, _}} = JobQueue.dequeue(:testredis, "test", "default")
job = Poison.decode!(job_str, as: Exq.Support.Job)
assert job.jid == jid
end
Expand Down
Loading

0 comments on commit ce84e3d

Please sign in to comment.