Skip to content

Commit

Permalink
[ISSUE apache#735] centralized management errors (apache#708)
Browse files Browse the repository at this point in the history
* centralized management errors

* update missed

* update missed

* fix import cycle

* go fmt
  • Loading branch information
180909 committed Nov 3, 2021
1 parent 4065610 commit 673760e
Show file tree
Hide file tree
Showing 20 changed files with 102 additions and 111 deletions.
5 changes: 2 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package rocketmq

import (
"context"

"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/errors"

"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand Down Expand Up @@ -134,5 +133,5 @@ type PullConsumer interface {
//
// The PullConsumer will be supported in next release
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
return nil, errors.New("pull consumer has not supported")
return nil, errors.ErrPullConsumer
}
16 changes: 8 additions & 8 deletions benchmark/stable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package main

import (
"errors"
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -52,23 +52,23 @@ func (st *stableTest) buildFlags(name string) {

func (st *stableTest) checkFlag() error {
if st.topic == "" {
return errors.New("empty topic")
return errors.ErrEmptyTopic
}

if st.nameSrv == "" {
return errors.New("empty namesrv")
return errors.ErrEmptyNameSrv
}

if st.groupID == "" {
return errors.New("empty group id")
return errors.ErrEmptyGroupID
}

if st.testMin <= 0 {
return errors.New("test miniutes must be positive integer")
return errors.ErrTestMin
}

if st.opIntervalSec <= 0 {
return errors.New("operation interval must be positive integer")
return errors.ErrOperationInterval
}

return nil
Expand Down Expand Up @@ -114,7 +114,7 @@ func (stp *stableTestProducer) checkFlag() error {
return err
}
if stp.bodySize <= 0 {
return errors.New("message body size must be positive integer")
return errors.ErrMessageBody
}

return nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func (stc *stableTestConsumer) checkFlag() error {
}

if stc.expression == "" {
return errors.New("empty expression")
return errors.ErrEmptyExpression
}
return nil
}
Expand Down
9 changes: 2 additions & 7 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package consumer
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"sort"
"strconv"
"strings"
Expand All @@ -29,7 +30,6 @@ import (

jsoniter "github.com/json-iterator/go"

"github.com/pkg/errors"
"github.com/tidwall/gjson"

"github.com/apache/rocketmq-client-go/v2/internal"
Expand Down Expand Up @@ -68,11 +68,6 @@ const (
_SubAll = "*"
)

var (
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
)

// Message model defines the way how messages are delivered to each consumer clients.
// </p>
//
Expand Down Expand Up @@ -822,7 +817,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa
rlog.Warning("no broker found for mq", map[string]interface{}{
rlog.LogKeyMessageQueue: queue,
})
return nil, ErrBrokerNotFound
return nil, errors.ErrBrokerNotFound
}

if brokerResult.Slave {
Expand Down
8 changes: 4 additions & 4 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package consumer
import (
"context"
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"sync"
"sync/atomic"

"github.com/pkg/errors"

"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
Expand Down Expand Up @@ -174,15 +174,15 @@ func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQ
}

if mq == nil {
return utils.ErrMQEmpty
return errors2.ErrMQEmpty
}

if offset < 0 {
return utils.ErrOffset
return errors2.ErrOffset
}

if numbers <= 0 {
return utils.ErrNumbers
return errors2.ErrNumbers
}
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package consumer
import (
"context"
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"math"
"strconv"
"strings"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (pc *pushConsumer) Start() error {
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = ErrCreated
err = errors2.ErrCreated
return
}

Expand Down Expand Up @@ -233,7 +234,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
return errors2.ErrStartTopic
}

if pc.option.Namespace != "" {
Expand Down
29 changes: 0 additions & 29 deletions errors.go

This file was deleted.

50 changes: 50 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package errors

import "errors"

var (
ErrRequestTimeout = errors.New("equest timeout")
ErrMQEmpty = errors.New("MessageQueue is nil")
ErrOffset = errors.New("offset < 0")
ErrNumbers = errors.New("numbers < 0")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyNameSrv = errors.New("empty namesrv")
ErrEmptyGroupID = errors.New("empty group id")
ErrTestMin = errors.New("test minutes must be positive integer")
ErrOperationInterval = errors.New("operation interval must be positive integer")
ErrMessageBody = errors.New("message body size must be positive integer")
ErrEmptyExpression = errors.New("empty expression")
ErrCreated = errors.New("consumer group has been created")
ErrBrokerNotFound = errors.New("broker can not found")
ErrStartTopic = errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
ErrResponse = errors.New("response error")
ErrCompressLevel = errors.New("unsupported compress level")
ErrUnknownIP = errors.New("unknown IP address")
ErrService = errors.New("service close is not running, please check")
ErrTopicNotExist = errors.New("topic not exist")
ErrNotExisted = errors.New("not existed")
ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
ErrMultiIP = errors.New("multiple IP addr does not support")
ErrIllegalIP = errors.New("IP addr error")
ErrTopicEmpty = errors.New("topic is nil")
ErrMessageEmpty = errors.New("message is nil")
ErrNotRunning = errors.New("producer not started")
ErrPullConsumer = errors.New("pull consumer has not supported")
)
3 changes: 2 additions & 1 deletion examples/consumer/pull/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"time"

"github.com/apache/rocketmq-client-go/v2"
Expand Down Expand Up @@ -52,7 +53,7 @@ func main() {
for {
resp, err := c.PullFrom(ctx, queue, offset, 10)
if err != nil {
if err == rocketmq.ErrRequestTimeout {
if err == errors.ErrRequestTimeout {
fmt.Printf("timeout \n")
time.Sleep(1 * time.Second)
continue
Expand Down
3 changes: 2 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -55,7 +56,7 @@ const (
)

var (
ErrServiceState = errors.New("service close is not running, please check")
ErrServiceState = errors2.ErrService

_VIPChannelEnable = false
)
Expand Down
5 changes: 2 additions & 3 deletions internal/remote/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package remote

import (
"context"
"github.com/apache/rocketmq-client-go/v2/errors"
"sync"

"github.com/apache/rocketmq-client-go/v2/internal/utils"
)

// ResponseFuture
Expand Down Expand Up @@ -62,7 +61,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
case <-r.Done:
cmd, err = r.ResponseCommand, r.Err
case <-r.ctx.Done():
err = utils.ErrRequestTimeout
err = errors.ErrRequestTimeout
r.Err = err
}
return cmd, err
Expand Down
12 changes: 5 additions & 7 deletions internal/remote/remote_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package remote
import (
"bytes"
"context"
"errors"
"github.com/apache/rocketmq-client-go/v2/errors"
"math/rand"
"net"
"reflect"
"sync"
"testing"
"time"

"github.com/apache/rocketmq-client-go/v2/internal/utils"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -80,12 +78,12 @@ func TestResponseFutureWaitResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000))
defer cancel()
future := NewResponseFuture(ctx, 10, nil)
if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
if _, err := future.waitResponse(); err != errors.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
utils.ErrRequestTimeout, err)
errors.ErrRequestTimeout, err)
}
future = NewResponseFuture(context.Background(), 10, nil)
responseError := errors.New("response error")
responseError := errors.ErrResponse
go func() {
time.Sleep(100 * time.Millisecond)
future.Err = responseError
Expand Down Expand Up @@ -295,7 +293,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, utils.ErrRequestTimeout, r.Err)
assert.Equal(t, errors.ErrRequestTimeout, r.Err)
wg.Done()
})
assert.Nil(t, err, "failed to invokeSync.")
Expand Down
8 changes: 2 additions & 6 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package internal

import (
"context"
"errors"
"github.com/apache/rocketmq-client-go/v2/errors"
"math/rand"
"sort"
"strconv"
Expand All @@ -46,10 +46,6 @@ const (
MasterId = int64(0)
)

var (
ErrTopicNotExist = errors.New("topic not exist")
)

func (s *namesrvs) cleanOfflineBroker() {
// TODO optimize
s.lockNamesrv.Lock()
Expand Down Expand Up @@ -434,7 +430,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
}
return routeData, nil
case ResTopicNotExist:
return nil, ErrTopicNotExist
return nil, errors.ErrTopicNotExist
default:
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
Expand Down
Loading

0 comments on commit 673760e

Please sign in to comment.