From 27a0c540d79cb5698b825423f2d96612ffd6ffbd Mon Sep 17 00:00:00 2001 From: Maria Efimenko Date: Thu, 22 Jun 2023 17:34:50 +0200 Subject: [PATCH 1/3] reject a faulty delivery logic --- delivery.go | 28 ---------------------------- queue.go | 33 +++++++++++++++++++++++---------- queue_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 38 deletions(-) diff --git a/delivery.go b/delivery.go index 51395e6..4b94f49 100644 --- a/delivery.go +++ b/delivery.go @@ -36,34 +36,6 @@ func (delivery *redisDelivery) Header() http.Header { return delivery.header } -func newDelivery( - ctx context.Context, - payload string, - unackedKey string, - rejectedKey string, - pushKey string, - redisClient RedisClient, - errChan chan<- error, -) (*redisDelivery, error) { - rd := redisDelivery{ - ctx: ctx, - payload: payload, - unackedKey: unackedKey, - rejectedKey: rejectedKey, - pushKey: pushKey, - redisClient: redisClient, - errChan: errChan, - } - - var err error - - if rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload); err != nil { - return nil, err - } - - return &rd, nil -} - func (delivery *redisDelivery) String() string { return fmt.Sprintf("[%s %s]", delivery.clearPayload, delivery.unackedKey) } diff --git a/queue.go b/queue.go index 198237c..f22cbb8 100644 --- a/queue.go +++ b/queue.go @@ -219,7 +219,7 @@ func (queue *redisQueue) consumeBatch() error { d, err := queue.newDelivery(payload) if err != nil { - return err + return fmt.Errorf("create new delivery: %w", err) } queue.deliveryChan <- d @@ -229,15 +229,28 @@ func (queue *redisQueue) consumeBatch() error { } func (queue *redisQueue) newDelivery(payload string) (Delivery, error) { - return newDelivery( - queue.ackCtx, - payload, - queue.unackedKey, - queue.rejectedKey, - queue.pushKey, - queue.redisClient, - queue.errChan, - ) + rd := &redisDelivery{ + ctx: queue.ackCtx, + payload: payload, + unackedKey: queue.unackedKey, + rejectedKey: queue.rejectedKey, + pushKey: queue.pushKey, + redisClient: queue.redisClient, + errChan: queue.errChan, + } + + var err error + rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload) + if err == nil { + return rd, nil + } + + rejectErr := rd.Reject() + if err != nil { + return nil, fmt.Errorf("%s, reject faulty delivery: %w", err, rejectErr) + } + + return nil, err } // StopConsuming can be used to stop all consumers on this queue. It returns a diff --git a/queue_test.go b/queue_test.go index a4c3431..8abf993 100644 --- a/queue_test.go +++ b/queue_test.go @@ -513,6 +513,46 @@ func TestReturnRejected(t *testing.T) { eventuallyRejected(t, queue, 0) } +func TestRejectFaultyMessages(t *testing.T) { + redisAddr, closer := testRedis(t) + defer closer() + + connection, err := OpenConnection("faulty-conn", "tcp", redisAddr, 1, nil) + require.NoError(t, err) + queue, err := connection.OpenQueue("faulty-q") + require.NoError(t, err) + _, err = queue.PurgeReady() + require.NoError(t, err) + + for i := 0; i < 6; i++ { + // if there is no line separator after the header in the message, + // it will lead to an error and the message will be rejected + err := queue.Publish(fmt.Sprintf("%sreturn-d%d", jsonHeaderSignature, i)) + require.NoError(t, err) + } + + eventuallyReady(t, queue, 6) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 0) + + require.NoError(t, queue.StartConsuming(10, time.Millisecond)) + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 6) + + consumer := NewTestConsumer("faulty-cons") + consumer.AutoAck = false + _, err = queue.AddConsumer("cons", consumer) + require.NoError(t, err) + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 6) + + require.Len(t, consumer.Deliveries(), 0) + + <-queue.StopConsuming() +} + func TestPushQueue(t *testing.T) { redisAddr, closer := testRedis(t) defer closer() From 4e138108e600d1f535521e960dd3d3aba66d516d Mon Sep 17 00:00:00 2001 From: Maria Efimenko Date: Fri, 23 Jun 2023 11:01:56 +0200 Subject: [PATCH 2/3] add a comment about why we reject a faulty delivery --- queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue.go b/queue.go index f22cbb8..4589558 100644 --- a/queue.go +++ b/queue.go @@ -212,7 +212,6 @@ func (queue *redisQueue) consumeBatch() error { if err == ErrorNotFound { return nil } - if err != nil { return err } @@ -245,6 +244,7 @@ func (queue *redisQueue) newDelivery(payload string) (Delivery, error) { return rd, nil } + // we need to reject a delivery here to move the delivery from the unacked to the rejected list. rejectErr := rd.Reject() if err != nil { return nil, fmt.Errorf("%s, reject faulty delivery: %w", err, rejectErr) From c97e3e077c56e735539c412eb56c815e612a33de Mon Sep 17 00:00:00 2001 From: Maria Efimenko Date: Fri, 23 Jun 2023 11:09:32 +0200 Subject: [PATCH 3/3] Update queue.go Co-authored-by: Christian Wellenbrock --- queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue.go b/queue.go index 4589558..57f8518 100644 --- a/queue.go +++ b/queue.go @@ -246,7 +246,7 @@ func (queue *redisQueue) newDelivery(payload string) (Delivery, error) { // we need to reject a delivery here to move the delivery from the unacked to the rejected list. rejectErr := rd.Reject() - if err != nil { + if rejectErr != nil { return nil, fmt.Errorf("%s, reject faulty delivery: %w", err, rejectErr) }