Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add adaptive service #1649

Merged
merged 43 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8f8cf96
feat(adasvc): add infrastructure for adaptive service
justxuewei Oct 29, 2021
4dfc01e
feat(adasvc): reference config support adaptive service
justxuewei Oct 29, 2021
2ca1018
feat(adasvc): add p2c load balance
justxuewei Oct 29, 2021
e83a8a6
feat(cluster): add capacity evaluator interface
justxuewei Oct 31, 2021
f0c64e6
feat(cluster): add capacity updater
justxuewei Nov 1, 2021
69dd9d3
feat(cluster): add capacity updater
justxuewei Nov 3, 2021
2db43f7
feat(cluster): add fields to vegas capeva
justxuewei Nov 3, 2021
588f392
feat(cluster): refactor capeva interface
justxuewei Nov 4, 2021
e0a4d1a
feat(cluster): add more fields to vegas capeva
justxuewei Nov 4, 2021
9af855a
feat(cluster): vegas evaupdater done
justxuewei Nov 5, 2021
785c74c
Merge branch '3.0' into feat/adasvc
justxuewei Nov 6, 2021
1d5fca1
fix(common): fix typo
justxuewei Nov 6, 2021
7a537ff
fix(common): fix typo
justxuewei Nov 6, 2021
6d866b5
fix(cluster): add apache license
justxuewei Nov 6, 2021
865c058
feat(cluster): define limiter & update interface
justxuewei Nov 15, 2021
9d26e68
Merge branch '3.0' into feat/adasvc
justxuewei Nov 15, 2021
c3f9dbe
feat(cluster): remove cpu stat temporarily
justxuewei Nov 16, 2021
ff4e60b
feat(cluster): update hill climbing limiter
justxuewei Nov 17, 2021
0472bf1
feat(cluster): hill climbing done
justxuewei Nov 19, 2021
7e98a27
Merge branch '3.0' into feat/adasvc
justxuewei Nov 19, 2021
70ca487
fix(cluster): fix issue where init limitation is 0
justxuewei Nov 20, 2021
c949cef
Merge branch '3.0' into feat/adasvc
justxuewei Nov 20, 2021
ece019f
feat(cluster): provder-side filter done
justxuewei Nov 20, 2021
bf43b0b
fix(cluster): fix uint64 subtraction issue
justxuewei Nov 20, 2021
d8ca7f1
fix(cluster): add adaptivesvc filter to default service filters
justxuewei Nov 22, 2021
30dcb14
style: go fmt
justxuewei Nov 22, 2021
5b48b40
fix(filter): import adaptivesvc
justxuewei Nov 22, 2021
99f6919
Merge branch '3.0' into feat/adasvc
justxuewei Nov 22, 2021
f906714
Merge branch '3.0' into feat/adasvc
justxuewei Nov 25, 2021
c7edac5
fix(imports): import adaptivesvc cluster and p2c loadbalance
justxuewei Nov 26, 2021
73b4f70
fix(config): fix unexpectedly panic
justxuewei Nov 26, 2021
1fa48ca
feat(adasvc): add debug logs
justxuewei Dec 1, 2021
0acc52e
fix(adasvc): pass attachements with string
justxuewei Dec 1, 2021
e5de62f
feat(adasvc): detail debug logs
justxuewei Dec 2, 2021
6f1d7bf
fix(adasvc): fix log info
justxuewei Dec 2, 2021
4a04d4a
feat: detail dubbo logs
justxuewei Dec 5, 2021
4ed7505
feat: remove useless logs
justxuewei Dec 5, 2021
8b12eac
fix(adasvc): fix incorrect type
justxuewei Dec 5, 2021
9ac6763
Merge branch '3.0' into feat-adasvc
justxuewei Dec 5, 2021
c23612b
style: go fmt & dubbofmt
justxuewei Dec 5, 2021
0f7459b
fix: rpc result attrs is not initialized
justxuewei Dec 5, 2021
429a336
fix(protocol): fix result panic when attrs is not initialized
justxuewei Dec 5, 2021
bfc336e
Merge branch '3.0' into feat-adasvc
justxuewei Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: detail dubbo logs
  • Loading branch information
justxuewei committed Dec 5, 2021
commit 4a04d4a7d5613c0f61003db38f38eb221035c9e7
4 changes: 2 additions & 2 deletions cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation
result := invoker.Invoke(ctx, invocation)

// TODO(justxuewei): remove after test
logger.Debugf("%#v", result.Result())
logger.Debugf("result: Result: %#v", result.Attachments())

// update metrics
remainingStr := invocation.AttachmentsByKey(constant.AdaptiveServiceRemainingKey, "")
remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string)
remaining, err := strconv.Atoi(remainingStr)
if err != nil {
logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err)
Expand Down
12 changes: 11 additions & 1 deletion cluster/loadbalance/p2c/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
j = rand.Intn(len(invokers))
}
}
logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.",
i, j, invokers[i], invokers[j])

// TODO(justxuewei): please consider how to get the real method name from $invoke,
// see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
methodName := invocation.MethodName()
Expand All @@ -81,6 +84,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.")
return invokers[i]
}
logger.Warnf("get method metrics err: %v", err)
Expand All @@ -90,6 +94,7 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.")
return invokers[j]
}
logger.Warnf("get method metrics err: %v", err)
Expand All @@ -99,18 +104,23 @@ func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.In
// Convert interface to int, if the type is unexpected, panic immediately
remainingI, ok := remainingIIface.(uint64)
if !ok {
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingIIface))
panic(fmt.Sprintf("[P2C select] the type of %s expects to be uint64, but gets %T",
metrics.HillClimbing, remainingIIface))
}

remainingJ, ok := remainingJIface.(uint64)
if !ok {
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
}

logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ)

// For the remaining capacity, the bigger, the better.
if remainingI > remainingJ {
logger.Debugf("[P2C select] The invoker[i] was selected.")
return invokers[i]
}

logger.Debugf("[P2C select] The invoker[j] was selected.")
return invokers[j]
}
2 changes: 1 addition & 1 deletion common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const (
const (
AccessLogFilterKey = "accesslog"
ActiveFilterKey = "active"
AdaptiveServiceProviderFilterKey = "adaptive-service-provider"
AdaptiveServiceProviderFilterKey = "padasvc"
AuthConsumerFilterKey = "sign"
AuthProviderFilterKey = "auth"
EchoFilterKey = "echo"
Expand Down
4 changes: 2 additions & 2 deletions filter/adaptivesvc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result pro
}

// set attachments to inform consumer of provider status
invocation.SetAttachments(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
invocation.SetAttachments(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
result.AddAttachment(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining()))
result.AddAttachment(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight()))
logger.Debugf("[adasvc filter] The attachments are set, %s: %d, %s: %d.",
constant.AdaptiveServiceRemainingKey, l.Remaining(),
constant.AdaptiveServiceInflightKey, l.Inflight())
Expand Down
2 changes: 1 addition & 1 deletion filter/adaptivesvc/limiter/hill_climbing.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error

limitation = math.Max(1.0, math.Min(limitation, float64(maxLimitation)))
u.limiter.limitation.Store(uint64(limitation))
VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", oldLimitation, uint64(limitation))
VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", uint64(oldLimitation), uint64(limitation))
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
result.Rest = inv.Reply()
result.Attrs = rest.Attrs
}
logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest)

logger.Debugf("[DubboInvoker.Invoke] received rpc result form server: %s", result)

return &result
}
Expand Down
1 change: 1 addition & 0 deletions protocol/dubbo/dubbo_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult
// p.Header.ResponseStatus = hessian.Response_OK
// p.Body = hessian.NewResponse(res, nil, result.Attachments())
}
result.Attrs = invokeResult.Attachments()
} else {
result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey())
}
Expand Down
6 changes: 4 additions & 2 deletions protocol/protocolwrapper/protocol_filter_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ func BuildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker {
}

if key == constant.ServiceFilterKey {
logger.Debugf("[BuildInvokerChain] The provider filters are %s, invoker: %s", filterNames, invoker)
logger.Debugf("[BuildInvokerChain] The provider invocation link is %s, invoker: %s",
strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker)
} else if key == constant.ReferenceFilterKey {
logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s", filterNames, invoker)
logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s",
strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker)
}
return next
}
Expand Down
6 changes: 6 additions & 0 deletions protocol/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package protocol

import "fmt"

// Result is a RPC result
type Result interface {
// SetError sets error.
Expand Down Expand Up @@ -92,3 +94,7 @@ func (r *RPCResult) Attachment(key string, defaultValue interface{}) interface{}
}
return v
}

func (r *RPCResult) String() string {
return fmt.Sprintf("&RPCResult{Rest: %v, Attrs: %v, Err: %v}", r.Rest, r.Attrs, r.Err)
}
1 change: 1 addition & 0 deletions remoting/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Codec interface {
}

type DecodeResult struct {
// IsRequest indicates whether the current request is a heartbeat request
IsRequest bool
Result interface{}
}
Expand Down
6 changes: 6 additions & 0 deletions remoting/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package remoting

import (
"fmt"
"sync"
"time"
)
Expand Down Expand Up @@ -115,6 +116,11 @@ func (response *Response) Handle() {
}
}

func (response *Response) String() string {
return fmt.Sprintf("&remoting.Response{ID: %d, Version: %s, SerialID: %d, Status: %d, Event: %v, Error: %v, Result: %v}",
response.ID, response.Version, response.SerialID, response.Status, response.Event, response.Error, response.Result)
}

type Options struct {
// connect timeout
ConnectTimeout time.Duration
Expand Down
4 changes: 4 additions & 0 deletions remoting/exchange_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url *comm
result.Rest = resultTmp.Rest
result.Attrs = resultTmp.Attrs
result.Err = resultTmp.Err
logger.Debugf("[ExchangeClient.Request] RPCResult from server: %v", resultTmp)
} else {
logger.Warnf("[ExchangeClient.Request] The type of result is unexpected, we want *protocol.RPCResult, "+
"but we got %T", rsp.response.Result)
}
return nil
}
Expand Down
17 changes: 11 additions & 6 deletions remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func (h *RpcClientHandler) OnClose(session getty.Session) {
func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
result, ok := pkg.(remoting.DecodeResult)
if !ok {
logger.Errorf("illegal package")
logger.Errorf("[RpcClientHandler.OnMessage] getty client gets an unexpected rpc result: %#v", result)
return
}
// get heartbeat request from server
if result.IsRequest {
req := result.Result.(*remoting.Request)
if req.Event {
logger.Debugf("get rpc heartbeat request{%#v}", req)
logger.Debugf("[RpcClientHandler.OnMessage] getty client gets a heartbeat request: %#v", req)
resp := remoting.NewResponse(req.ID, req.Version)
resp.Status = hessian.Response_OK
resp.Event = req.Event
Expand All @@ -118,22 +118,23 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) {
reply(session, resp)
return
}
logger.Errorf("illegal request but not heartbeat. {%#v}", req)
logger.Errorf("[RpcClientHandler.OnMessage] unexpected heartbeat request: %#v", req)
return
}
h.timeoutTimes = 0
p := result.Result.(*remoting.Response)
// get heartbeat
if p.Event {
logger.Debugf("get rpc heartbeat response{%#v}", p)
logger.Debugf("[RpcClientHandler.OnMessage] getty client received a heartbeat response: %s", p)
if p.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", p.Error)
logger.Errorf("[RpcClientHandler.OnMessage] a heartbeat response received by the getty client "+
"encounters an error: %v", p.Error)
}
p.Handle()
return
}

logger.Debugf("get rpc response{%#v}", p)
logger.Debugf("[RpcClientHandler.OnMessage] getty client received a response: %s", p)

h.conn.updateSession(session)

Expand Down Expand Up @@ -303,11 +304,15 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
attachments[constant.LocalAddr] = session.LocalAddr()
attachments[constant.RemoteAddr] = session.RemoteAddr()

logger.Debugf("[RpcServerHandler.OnMessage] invoc.Attrs: %v, invoc.MethodName: %s",
invoc.Attachments(), invoc.MethodName())

result := h.server.requestHandler(invoc)
if !req.TwoWay {
return
}
resp.Result = result
logger.Debugf("[RpcServerHandler.OnMessage] result attrs: %v, req: %v", result.Attrs, req)
reply(session, resp)
}

Expand Down