Skip to content

Commit

Permalink
Merge pull request #1649 from apache/feat-adasvc
Browse files Browse the repository at this point in the history
feat: add adaptive service
  • Loading branch information
justxuewei committed Dec 7, 2021
2 parents e4fa5df + bfc336e commit 55e9fdf
Show file tree
Hide file tree
Showing 36 changed files with 1,237 additions and 55 deletions.
55 changes: 55 additions & 0 deletions cluster/cluster/adaptivesvc/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package adaptivesvc

import (
"sync"
)

import (
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
once sync.Once
instance clusterpkg.Cluster
)

func init() {
extension.SetCluster(constant.ClusterKeyAdaptiveService, newAdaptiveServiceCluster)
}

// adaptiveServiceCluster is a cluster for adaptive service.
type adaptiveServiceCluster struct{}

func newAdaptiveServiceCluster() clusterpkg.Cluster {
if instance == nil {
once.Do(func() {
instance = &adaptiveServiceCluster{}
})
}
return instance
}

func (c *adaptiveServiceCluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(newAdaptiveServiceClusterInvoker(directory))
}
85 changes: 85 additions & 0 deletions cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package adaptivesvc

import (
"context"
"strconv"
)

import (
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type adaptiveServiceClusterInvoker struct {
base.ClusterInvoker
}

func newAdaptiveServiceClusterInvoker(directory directory.Directory) protocol.Invoker {
return &adaptiveServiceClusterInvoker{
ClusterInvoker: base.NewClusterInvoker(directory),
}
}

func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := ivk.Directory.List(invocation)
if err := ivk.CheckInvokers(invokers, invocation); err != nil {
return protocol.NewRPCResult(nil, err)
}

// get loadBalance
lbKey := invokers[0].GetURL().GetParam(constant.LoadbalanceKey, constant.LoadBalanceKeyP2C)
if lbKey != constant.LoadBalanceKeyP2C {
return protocol.NewRPCResult(nil, perrors.Errorf("adaptive service not supports %s load balance", lbKey))
}
lb := extension.GetLoadbalance(lbKey)

// select a node by the loadBalance
invoker := lb.Select(invokers, invocation)

// invoke
result := invoker.Invoke(ctx, invocation)

// update metrics
remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string)
remaining, err := strconv.Atoi(remainingStr)
if err != nil {
logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err)
return result
}
logger.Debugf("[adasvc cluster] The server status was received successfully, %s: %#v",
constant.AdaptiveServiceRemainingKey, remainingStr)
err = metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
invocation.MethodName(), metrics.HillClimbing, uint64(remaining))
if err != nil {
logger.Warnf("adaptive service metrics update is failed, err: %v", err)
return protocol.NewRPCResult(nil, err)
}

return result
}
6 changes: 3 additions & 3 deletions cluster/cluster/available/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
invokers := invoker.Directory.List(invocation)
err := invoker.CheckInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
return protocol.NewRPCResult(nil, err)
}

err = invoker.CheckWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
return protocol.NewRPCResult(nil, err)
}

for _, ivk := range invokers {
if ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))}
return protocol.NewRPCResult(nil, errors.New(fmt.Sprintf("no provider available in %v", invokers)))
}
2 changes: 1 addition & 1 deletion cluster/cluster/available/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
mockResult := protocol.NewRPCResult(clusterpkg.Rest{Tried: 0, Success: true}, nil)
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand Down
1 change: 1 addition & 0 deletions cluster/cluster_impl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package cluster_impl
// This package may be DEPRECATED OR REMOVED in the future.

import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/adaptivesvc"
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/available"
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/broadcast"
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failback"
Expand Down
129 changes: 129 additions & 0 deletions cluster/loadbalance/p2c/loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package p2c

import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
extension.SetLoadbalance(constant.LoadBalanceKeyP2C, newLoadBalance)
}

var (
once sync.Once
instance loadbalance.LoadBalance
)

type loadBalance struct{}

func newLoadBalance() loadbalance.LoadBalance {
if instance == nil {
once.Do(func() {
instance = &loadBalance{}
})
}
return instance
}

func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
if len(invokers) == 0 {
return nil
}
if len(invokers) == 1 {
return invokers[0]
}
// m is the Metrics, which saves the metrics of instance, invokers and methods
// The local metrics is available only for the earlier version.
m := metrics.LocalMetrics
// picks two nodes randomly
var i, j int
if len(invokers) == 2 {
i, j = 0, 1
} else {
rand.Seed(time.Now().Unix())
i = rand.Intn(len(invokers))
j = i
for i == j {
j = rand.Intn(len(invokers))
}
}
logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.",
i, j, invokers[i], invokers[j])

// TODO(justxuewei): please consider how to get the real method name from $invoke,
// see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
methodName := invocation.MethodName()
// remainingIIface, remainingJIface means remaining capacity of node i and node j.
// If one of the metrics is empty, invoke the invocation to that node directly.
remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.")
return invokers[i]
}
logger.Warnf("get method metrics err: %v", err)
return nil
}

remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.")
return invokers[j]
}
logger.Warnf("get method metrics err: %v", err)
return nil
}

// Convert interface to int, if the type is unexpected, panic immediately
remainingI, ok := remainingIIface.(uint64)
if !ok {
panic(fmt.Sprintf("[P2C select] the type of %s expects to be uint64, but gets %T",
metrics.HillClimbing, remainingIIface))
}

remainingJ, ok := remainingJIface.(uint64)
if !ok {
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
}

logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ)

// For the remaining capacity, the bigger, the better.
if remainingI > remainingJ {
logger.Debugf("[P2C select] The invoker[i] was selected.")
return invokers[i]
}

logger.Debugf("[P2C select] The invoker[j] was selected.")
return invokers[j]
}
22 changes: 22 additions & 0 deletions cluster/metrics/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

const (
HillClimbing = "hill-climbing"
)
Loading

0 comments on commit 55e9fdf

Please sign in to comment.