Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add adaptive service #1649

Merged
merged 43 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8f8cf96
feat(adasvc): add infrastructure for adaptive service
justxuewei Oct 29, 2021
4dfc01e
feat(adasvc): reference config support adaptive service
justxuewei Oct 29, 2021
2ca1018
feat(adasvc): add p2c load balance
justxuewei Oct 29, 2021
e83a8a6
feat(cluster): add capacity evaluator interface
justxuewei Oct 31, 2021
f0c64e6
feat(cluster): add capacity updater
justxuewei Nov 1, 2021
69dd9d3
feat(cluster): add capacity updater
justxuewei Nov 3, 2021
2db43f7
feat(cluster): add fields to vegas capeva
justxuewei Nov 3, 2021
588f392
feat(cluster): refactor capeva interface
justxuewei Nov 4, 2021
e0a4d1a
feat(cluster): add more fields to vegas capeva
justxuewei Nov 4, 2021
9af855a
feat(cluster): vegas evaupdater done
justxuewei Nov 5, 2021
785c74c
Merge branch '3.0' into feat/adasvc
justxuewei Nov 6, 2021
1d5fca1
fix(common): fix typo
justxuewei Nov 6, 2021
7a537ff
fix(common): fix typo
justxuewei Nov 6, 2021
6d866b5
fix(cluster): add apache license
justxuewei Nov 6, 2021
865c058
feat(cluster): define limiter & update interface
justxuewei Nov 15, 2021
9d26e68
Merge branch '3.0' into feat/adasvc
justxuewei Nov 15, 2021
c3f9dbe
feat(cluster): remove cpu stat temporarily
justxuewei Nov 16, 2021
ff4e60b
feat(cluster): update hill climbing limiter
justxuewei Nov 17, 2021
0472bf1
feat(cluster): hill climbing done
justxuewei Nov 19, 2021
7e98a27
Merge branch '3.0' into feat/adasvc
justxuewei Nov 19, 2021
70ca487
fix(cluster): fix issue where init limitation is 0
justxuewei Nov 20, 2021
c949cef
Merge branch '3.0' into feat/adasvc
justxuewei Nov 20, 2021
ece019f
feat(cluster): provder-side filter done
justxuewei Nov 20, 2021
bf43b0b
fix(cluster): fix uint64 subtraction issue
justxuewei Nov 20, 2021
d8ca7f1
fix(cluster): add adaptivesvc filter to default service filters
justxuewei Nov 22, 2021
30dcb14
style: go fmt
justxuewei Nov 22, 2021
5b48b40
fix(filter): import adaptivesvc
justxuewei Nov 22, 2021
99f6919
Merge branch '3.0' into feat/adasvc
justxuewei Nov 22, 2021
f906714
Merge branch '3.0' into feat/adasvc
justxuewei Nov 25, 2021
c7edac5
fix(imports): import adaptivesvc cluster and p2c loadbalance
justxuewei Nov 26, 2021
73b4f70
fix(config): fix unexpectedly panic
justxuewei Nov 26, 2021
1fa48ca
feat(adasvc): add debug logs
justxuewei Dec 1, 2021
0acc52e
fix(adasvc): pass attachements with string
justxuewei Dec 1, 2021
e5de62f
feat(adasvc): detail debug logs
justxuewei Dec 2, 2021
6f1d7bf
fix(adasvc): fix log info
justxuewei Dec 2, 2021
4a04d4a
feat: detail dubbo logs
justxuewei Dec 5, 2021
4ed7505
feat: remove useless logs
justxuewei Dec 5, 2021
8b12eac
fix(adasvc): fix incorrect type
justxuewei Dec 5, 2021
9ac6763
Merge branch '3.0' into feat-adasvc
justxuewei Dec 5, 2021
c23612b
style: go fmt & dubbofmt
justxuewei Dec 5, 2021
0f7459b
fix: rpc result attrs is not initialized
justxuewei Dec 5, 2021
429a336
fix(protocol): fix result panic when attrs is not initialized
justxuewei Dec 5, 2021
bfc336e
Merge branch '3.0' into feat-adasvc
justxuewei Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(cluster): provder-side filter done
  • Loading branch information
justxuewei committed Nov 20, 2021
commit ece019ff2a57e3b285d6ce2875122bb640902d0b
25 changes: 18 additions & 7 deletions cluster/cluster/adaptivesvc/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,30 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
"sync"
)

var (
once sync.Once
instance clusterpkg.Cluster
)

func init() {
extension.SetCluster(constant.ClusterKeyAdaptiveService, newCluster)
extension.SetCluster(constant.ClusterKeyAdaptiveService, newAdaptiveServiceCluster)
}

// cluster is a cluster for adaptive service.
type cluster struct{}
// adaptiveServiceCluster is a cluster for adaptive service.
type adaptiveServiceCluster struct{}

func newCluster() clusterpkg.Cluster {
return &cluster{}
func newAdaptiveServiceCluster() clusterpkg.Cluster {
if instance == nil {
once.Do(func() {
instance = &adaptiveServiceCluster{}
})
}
return instance
}

func (c *cluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(NewClusterInvoker(directory))
func (c *adaptiveServiceCluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(newAdaptiveServiceClusterInvoker(directory))
}
26 changes: 21 additions & 5 deletions cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,51 @@ import (
"context"
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
perrors "github.com/pkg/errors"
)

type clusterInvoker struct {
type adaptiveServiceClusterInvoker struct {
base.ClusterInvoker
}

func NewClusterInvoker(directory directory.Directory) protocol.Invoker {
return &clusterInvoker{
func newAdaptiveServiceClusterInvoker(directory directory.Directory) protocol.Invoker {
return &adaptiveServiceClusterInvoker{
ClusterInvoker: base.NewClusterInvoker(directory),
}
}

func (ivk *clusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := ivk.Directory.List(invocation)
if err := ivk.CheckInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}

// get loadBalance
lbKey := invokers[0].GetURL().GetParam(constant.LoadbalanceKey, constant.LoadBalanceKeyP2C)
if lbKey != constant.LoadBalanceKeyP2C {
return &protocol.RPCResult{
Err: perrors.Errorf("adaptive service not supports %s load balance", lbKey),
}
}
lb := extension.GetLoadbalance(lbKey)

// select a node by the loadBalance
invoker := lb.Select(invokers, invocation)
return invoker.Invoke(ctx, invocation)

// invoke
result := invoker.Invoke(ctx, invocation)

// update metrics
remaining := invocation.Attachments()[constant.AdaptiveServiceRemainingKey]
err := metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
invocation.MethodName(), metrics.HillClimbing, remaining)
if err != nil {
return &protocol.RPCResult{Err: err}
}

return result
}
34 changes: 22 additions & 12 deletions cluster/loadbalance/p2c/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,28 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)

func init() {
extension.SetLoadbalance(constant.LoadBalanceKeyP2C, newLoadBalance)
}

type loadBalance struct {
}
var (
once sync.Once
instance loadbalance.LoadBalance
)

type loadBalance struct{}

func newLoadBalance() loadbalance.LoadBalance {
return &loadBalance{}
if instance == nil {
once.Do(func() {
instance = &loadBalance{}
})
}
return instance
}

func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
Expand Down Expand Up @@ -66,9 +76,9 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
// TODO(justxuewei): please consider how to get the real method name from $invoke,
// see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
methodName := invocation.MethodName()
// viInterface, vjInterface means vegas latency of node i and node j
// remainingIIface, remainingJIface means remaining capacity of node i and node j.
// If one of the metrics is empty, invoke the invocation to that node directly.
viInterface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.Vegas)
remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
return invokers[i]
Expand All @@ -77,7 +87,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
return nil
}

vjInterface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.Vegas)
remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
return invokers[j]
Expand All @@ -87,18 +97,18 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
}

// Convert interface to int, if the type is unexpected, panic immediately
vi, ok := viInterface.(int)
remainingI, ok := remainingIIface.(uint64)
if !ok {
panic(fmt.Sprintf("the type of %s expects to be int, but gets %T", metrics.Vegas, viInterface))
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingIIface))
}

vj, ok := vjInterface.(int)
remainingJ, ok := remainingJIface.(uint64)
if !ok {
panic(fmt.Sprintf("the type of %s expects to be int, but gets %T", metrics.Vegas, viInterface))
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
}

// For the latency time, the smaller, the better.
if vi < vj {
// For the remaining capacity, the bigger, the better.
if remainingI > remainingJ {
return invokers[i]
}

Expand Down
2 changes: 1 addition & 1 deletion cluster/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
package metrics

const (
Vegas = "vegas"
HillClimbing = "hill-climbing"
)
12 changes: 11 additions & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
const (
AccessLogFilterKey = "accesslog"
ActiveFilterKey = "active"
AdaptiveServiceProviderFilterKey = "adaptive-service-provider"
AuthConsumerFilterKey = "sign"
AuthProviderFilterKey = "auth"
EchoFilterKey = "echo"
Expand Down Expand Up @@ -340,10 +341,19 @@ const (
)

// Generic Filter

const (
GenericSerializationDefault = "true"
// disable "protobuf-json" temporarily
//GenericSerializationProtobuf = "protobuf-json"
GenericSerializationGson = "gson"
)

// AdaptiveService Filter
// goland:noinspection ALL
const (
// attribute keys
AdaptiveServiceUpdaterKey = "adaptive-service.updater"
// attachment keys
AdaptiveServiceRemainingKey = "adaptive-service.remaining"
AdaptiveServiceInflightKey = "adaptive-service.inflight"
)
20 changes: 12 additions & 8 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
)

// ProviderConfig is the default configuration of service provider
Expand All @@ -40,14 +41,13 @@ type ProviderConfig struct {
// TracingKey is tracing ids list
TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
// Services services
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`

ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"`

FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`

rootConfig *RootConfig
Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"`
ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"`
FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"`
ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"`
// adaptive service
AdaptiveServiceVerbose bool `default:"false" yaml:"adaptive-service-verbose" json:"adaptive-service-verbose" property:"adaptive-service-verbose"`
rootConfig *RootConfig
}

func (ProviderConfig) Prefix() string {
Expand Down Expand Up @@ -83,6 +83,10 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
if err := c.check(); err != nil {
return err
}
// enable adaptive service verbose
if c.AdaptiveServiceVerbose {
aslimiter.Verbose = true
}
return nil
}

Expand Down
88 changes: 81 additions & 7 deletions filter/adaptivesvc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,92 @@ package adaptivesvc

import (
"context"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter"
"dubbo.apache.org/dubbo-go/v3/protocol"
"fmt"
"github.com/pkg/errors"
"sync"
)

// Filter for adaptive service on server side.
type Filter struct{}
var (
adaptiveServiceProviderFilterOnce sync.Once
instance filter.Filter

func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
//eva, err := limiterMapperSingleton.getMethodCapacityEvaluator(invoker.GetURL(), invocation.MethodName(), )
panic("implement me")
ErrUpdaterNotFound = fmt.Errorf("updater not found")
ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type")
)

func init() {
extension.SetFilter(constant.AdaptiveServiceProviderFilterKey, newAdaptiveServiceProviderFilter)
}

// adaptiveServiceProviderFilter is for adaptive service on the provider side.
type adaptiveServiceProviderFilter struct{}

func newAdaptiveServiceProviderFilter() filter.Filter {
if instance == nil {
adaptiveServiceProviderFilterOnce.Do(func() {
instance = &adaptiveServiceProviderFilter{}
})
}
return instance
}

func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(), invocation.MethodName())
if err != nil {
if errors.Is(err, ErrLimiterNotFoundOnMapper) {
// limiter is not found on the mapper, just create
// a new limiter
if l, err = limiterMapperSingleton.newAndSetMethodLimiter(invoker.GetURL(),
invocation.MethodName(), limiter.HillClimbingLimiter); err != nil {
return &protocol.RPCResult{Err: err}
}
} else {
// unexpected errors
return &protocol.RPCResult{Err: err}
}
}

updater, err := l.Acquire()
if err != nil {
return &protocol.RPCResult{Err: err}
}

invocation.Attributes()[constant.AdaptiveServiceUpdaterKey] = updater

return invoker.Invoke(ctx, invocation)
}

func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker,
func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result protocol.Result, invoker protocol.Invoker,
invocation protocol.Invocation) protocol.Result {
panic("implement me")
// get updater from the attributes
updaterIface := invocation.AttributeByKey(constant.AdaptiveServiceUpdaterKey, nil)
if updaterIface == nil {
return &protocol.RPCResult{Err: ErrUpdaterNotFound}
}
updater, ok := updaterIface.(limiter.Updater)
if !ok {
return &protocol.RPCResult{Err: ErrUnexpectedUpdaterType}
}

err := updater.DoUpdate()
if err != nil {
return &protocol.RPCResult{Err: err}
}

// get limiter for the mapper
l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(), invocation.MethodName())
if err != nil {
return &protocol.RPCResult{Err: err}
}

// set attachments to inform consumer of provider status
invocation.SetAttachments(constant.AdaptiveServiceRemainingKey, l.Remaining())
invocation.SetAttachments(constant.AdaptiveServiceInflightKey, l.Inflight())

return result
}
22 changes: 21 additions & 1 deletion filter/adaptivesvc/limiter/hill_climbing.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package limiter

import (
Expand Down Expand Up @@ -113,12 +130,15 @@ func NewHillClimbingUpdater(limiter *HillClimbing) *HillClimbingUpdater {
return u
}

func (u *HillClimbingUpdater) DoUpdate(rtt, inflight uint64) error {
func (u *HillClimbingUpdater) DoUpdate() error {
defer func() {
u.limiter.inflight.Add(-1)
}()
VerboseDebugf("[HillClimbingUpdater] A request finished, the limiter will be updated, seq: %d.", u.seq)

rtt := uint64(time.Now().Sub(u.startTime))
inflight := u.limiter.Inflight()

option, err := u.getOption(rtt, inflight)
if err != nil {
return err
Expand Down
Loading