Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add adaptive service #1649

Merged
merged 43 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8f8cf96
feat(adasvc): add infrastructure for adaptive service
justxuewei Oct 29, 2021
4dfc01e
feat(adasvc): reference config support adaptive service
justxuewei Oct 29, 2021
2ca1018
feat(adasvc): add p2c load balance
justxuewei Oct 29, 2021
e83a8a6
feat(cluster): add capacity evaluator interface
justxuewei Oct 31, 2021
f0c64e6
feat(cluster): add capacity updater
justxuewei Nov 1, 2021
69dd9d3
feat(cluster): add capacity updater
justxuewei Nov 3, 2021
2db43f7
feat(cluster): add fields to vegas capeva
justxuewei Nov 3, 2021
588f392
feat(cluster): refactor capeva interface
justxuewei Nov 4, 2021
e0a4d1a
feat(cluster): add more fields to vegas capeva
justxuewei Nov 4, 2021
9af855a
feat(cluster): vegas evaupdater done
justxuewei Nov 5, 2021
785c74c
Merge branch '3.0' into feat/adasvc
justxuewei Nov 6, 2021
1d5fca1
fix(common): fix typo
justxuewei Nov 6, 2021
7a537ff
fix(common): fix typo
justxuewei Nov 6, 2021
6d866b5
fix(cluster): add apache license
justxuewei Nov 6, 2021
865c058
feat(cluster): define limiter & update interface
justxuewei Nov 15, 2021
9d26e68
Merge branch '3.0' into feat/adasvc
justxuewei Nov 15, 2021
c3f9dbe
feat(cluster): remove cpu stat temporarily
justxuewei Nov 16, 2021
ff4e60b
feat(cluster): update hill climbing limiter
justxuewei Nov 17, 2021
0472bf1
feat(cluster): hill climbing done
justxuewei Nov 19, 2021
7e98a27
Merge branch '3.0' into feat/adasvc
justxuewei Nov 19, 2021
70ca487
fix(cluster): fix issue where init limitation is 0
justxuewei Nov 20, 2021
c949cef
Merge branch '3.0' into feat/adasvc
justxuewei Nov 20, 2021
ece019f
feat(cluster): provder-side filter done
justxuewei Nov 20, 2021
bf43b0b
fix(cluster): fix uint64 subtraction issue
justxuewei Nov 20, 2021
d8ca7f1
fix(cluster): add adaptivesvc filter to default service filters
justxuewei Nov 22, 2021
30dcb14
style: go fmt
justxuewei Nov 22, 2021
5b48b40
fix(filter): import adaptivesvc
justxuewei Nov 22, 2021
99f6919
Merge branch '3.0' into feat/adasvc
justxuewei Nov 22, 2021
f906714
Merge branch '3.0' into feat/adasvc
justxuewei Nov 25, 2021
c7edac5
fix(imports): import adaptivesvc cluster and p2c loadbalance
justxuewei Nov 26, 2021
73b4f70
fix(config): fix unexpectedly panic
justxuewei Nov 26, 2021
1fa48ca
feat(adasvc): add debug logs
justxuewei Dec 1, 2021
0acc52e
fix(adasvc): pass attachements with string
justxuewei Dec 1, 2021
e5de62f
feat(adasvc): detail debug logs
justxuewei Dec 2, 2021
6f1d7bf
fix(adasvc): fix log info
justxuewei Dec 2, 2021
4a04d4a
feat: detail dubbo logs
justxuewei Dec 5, 2021
4ed7505
feat: remove useless logs
justxuewei Dec 5, 2021
8b12eac
fix(adasvc): fix incorrect type
justxuewei Dec 5, 2021
9ac6763
Merge branch '3.0' into feat-adasvc
justxuewei Dec 5, 2021
c23612b
style: go fmt & dubbofmt
justxuewei Dec 5, 2021
0f7459b
fix: rpc result attrs is not initialized
justxuewei Dec 5, 2021
429a336
fix(protocol): fix result panic when attrs is not initialized
justxuewei Dec 5, 2021
bfc336e
Merge branch '3.0' into feat-adasvc
justxuewei Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(cluster): add more fields to vegas capeva
  • Loading branch information
justxuewei committed Nov 4, 2021
commit e0a4d1a1d30ef30f78b2ffca28244cd97d7ed1c6
4 changes: 2 additions & 2 deletions filter/adaptivesvc/capeva/capacity_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package capeva

type CapacityEvaluator interface {
// EstimatedCapacity is estimated capacity, which reflects the maximum requests handled by the provider.
EstimatedCapacity() int64
EstimatedCapacity() uint64

// ActualCapacity is actual requests on the provider.
ActualCapacity() int64
ActualCapacity() uint64

// NewCapacityUpdater returns a capacity updater
NewCapacityUpdater() CapacityUpdater
Expand Down
35 changes: 35 additions & 0 deletions filter/adaptivesvc/capeva/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package capeva

import "go.uber.org/atomic"

func after(crt, next *atomic.Uint64) bool {
return crt.Load() > next.Load()
}

// setValueIfLess sets newValue to v if newValue is less than v
func setValueIfLess(v *atomic.Uint64, newValue uint64) {
vuint64 := v.Load()
for vuint64 > newValue {
if !v.CAS(vuint64, newValue) {
vuint64 = v.Load()
}
}
}

func slowStart(est, thresh *atomic.Uint64) {
var estValue, threshValue, newEst uint64

for {
estValue = est.Load()
threshValue = thresh.Load()

newEst = estValue * 2
if threshValue < newEst {
newEst = threshValue
}

if est.CAS(estValue, newEst) {
break
}
}
}
57 changes: 40 additions & 17 deletions filter/adaptivesvc/capeva/vegas.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,72 @@ package capeva

import (
"go.uber.org/atomic"
"math"
)

const (
defaultRoundSize uint64 = 100
defaultThreshold uint64 = 100

defaultRoundSize uint64 = 10

defaultAlpha uint64 = 2
defaultBeta uint64 = 4
defaultGamma uint64 = 1
)

// Vegas is a capacity evaluator using Vegas congestion avoid.
// RTT is not exactly same as TCP's,
// in this case, RTT means the time that the provider perform a method.
type Vegas struct {
Estimated, Actual *atomic.Uint64
Estimated, Actual, Threshold *atomic.Uint64

// TODO(justxuewei): load values from config
Alpha, Beta, Gamma uint64

// RoundSize specifies the size of the round, which reflects on the speed of updating estimation.
// The MinRTT and CntRTT will be reset in the next round.
// The smaller RoundSize is, the faster reevaluating estimated capacity is.
RoundSize uint64

Seq *atomic.Uint64
NextRoundSeq *atomic.Uint64
Seq *atomic.Uint64
NextRoundLeftBound *atomic.Uint64

BaseRTT *atomic.Uint64
MinRTT *atomic.Uint64
CntRTT *atomic.Uint64
CntRTT *atomic.Uint64 // not used so far
}

func NewVegas() *Vegas {
estimated := atomic.NewUint64(1)
threshold := atomic.NewUint64(defaultThreshold)

minRTT := atomic.NewUint64(math.MaxUint64)

return &Vegas{
Estimated: &atomic.Uint64{},
Actual: &atomic.Uint64{},
RoundSize: defaultRoundSize,
Seq: &atomic.Uint64{},
NextRoundSeq: &atomic.Uint64{},
BaseRTT: &atomic.Uint64{},
MinRTT: &atomic.Uint64{},
CntRTT: &atomic.Uint64{},
Estimated: estimated,
Actual: &atomic.Uint64{},
Threshold: threshold,

Alpha: defaultAlpha,
Beta: defaultBeta,
Gamma: defaultGamma,
RoundSize: defaultRoundSize,

Seq: &atomic.Uint64{},
NextRoundLeftBound: &atomic.Uint64{},

BaseRTT: &atomic.Uint64{},
MinRTT: minRTT,
CntRTT: &atomic.Uint64{},
}
}

func (v *Vegas) EstimatedCapacity() int64 {
panic("implement me")
func (v *Vegas) EstimatedCapacity() uint64 {
return v.Estimated.Load()
}

func (v *Vegas) ActualCapacity() int64 {
panic("implement me")
func (v *Vegas) ActualCapacity() uint64 {
return v.Actual.Load()
}

func (v *Vegas) NewCapacityUpdater() CapacityUpdater {
Expand Down
60 changes: 44 additions & 16 deletions filter/adaptivesvc/capeva/vegas_updater.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package capeva

import "time"
import (
"go.uber.org/atomic"
"math"
"time"
)

type vegasUpdater struct {
eva *Vegas
Expand All @@ -15,28 +19,52 @@ func newVegasUpdater(eva *Vegas) CapacityUpdater {
}
}

func (v *vegasUpdater) Succeed() {
v.updateAfterReturn()
func (u *vegasUpdater) Succeed() {
u.updateAfterReturn()
}

func (v *vegasUpdater) Failed() {
v.updateAfterReturn()
func (u *vegasUpdater) Failed() {
u.updateAfterReturn()
}

func (v *vegasUpdater) updateAfterReturn() {
v.eva.Actual.Add(-1)
v.updateRTTs()
func (u *vegasUpdater) updateAfterReturn() {
u.eva.Actual.Add(-1)
u.updateRTTs()

u.reevaEstCap()
}

func (v *vegasUpdater) updateRTTs() {
func (u *vegasUpdater) updateRTTs() {
// update BaseRTT
curRTT := uint64(time.Now().Sub(v.startedTime))
oldBaseRTT := v.eva.BaseRTT.Load()
for oldBaseRTT > curRTT {
if !v.eva.BaseRTT.CAS(oldBaseRTT, curRTT) {
oldBaseRTT = v.eva.BaseRTT.Load()
}
}
curRTT := uint64(time.Now().Sub(u.startedTime))
setValueIfLess(u.eva.BaseRTT, curRTT)
// update MinRTT
setValueIfLess(u.eva.MinRTT, curRTT)
// update CntRTT
u.eva.CntRTT.Add(1)
}

// reevaEstCap reevaluates estimated capacity if the round
func (u *vegasUpdater) reevaEstCap() {
if after(u.eva.Seq, u.eva.NextRoundLeftBound) {
// update next round
u.eva.NextRoundLeftBound.Add(u.eva.RoundSize)

rtt := u.eva.MinRTT.Load()
baseRTT := u.eva.BaseRTT.Load()

thresh := u.eva.Threshold.Load()
estCap := u.eva.EstimatedCapacity()

targetEstCap := estCap * baseRTT / rtt
diff := estCap * (rtt - baseRTT) / baseRTT

if diff > u.eva.Gamma && estCap <= thresh {

}

// reset MinRTT & CntRTT
u.eva.MinRTT.Store(math.MaxUint64)
u.eva.CntRTT.Store(0)
}
}