-
Notifications
You must be signed in to change notification settings - Fork 22
/
ws_cli_farm_conn_info.go
96 lines (82 loc) · 2.82 KB
/
ws_cli_farm_conn_info.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package connector
import (
"fmt"
"math"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
. "github.com/torchcc/crank4go/util"
)
type WebsocketClientFarm struct {
maxSlidingWindowSize int
websocketClientFarmInfoMap map[string]int
connectorSockets *sync.Map
}
// NewWebsocketClientFarm Set by service. maxSlidingWindowSize = 2 * slidingWindowSize because connector add and remove websocket in different goroutines,
// which may result in adding websocket goes before removing websocket.
func NewWebsocketClientFarm(slidingWindowSize int) *WebsocketClientFarm {
return &WebsocketClientFarm{
maxSlidingWindowSize: slidingWindowSize * 2,
websocketClientFarmInfoMap: nil,
connectorSockets: &sync.Map{},
}
}
func (f *WebsocketClientFarm) addWebsocket(registerUrl string) {
var atomicInt int32
atomicInt = 0
addr, _ := f.connectorSockets.LoadOrStore(registerUrl, &atomicInt)
atomic.AddInt32(addr.(*int32), 1)
LOG.Debugf("add websocket for registerUrl=%s, current websocketClientFarm=%#v", registerUrl, f)
}
func (f *WebsocketClientFarm) removeWebsocket(registerUrl string) {
if addr, ok := f.connectorSockets.Load(registerUrl); ok {
atomic.AddInt32(addr.(*int32), -1)
LOG.Debugf("remove websocket for registerUrl=%s, current websocketClientFarm=%#v", registerUrl, f)
}
}
func (f *WebsocketClientFarm) isSafeToAddWebsocket(registerUrl *url.URL) bool {
isNotDeregisterPath := !strings.HasPrefix(registerUrl.Path, "/deregister")
var idleSocketNum int32
if addr, ok := f.connectorSockets.Load(registerUrl.String()); ok {
idleSocketNum = *(addr.(*int32))
}
return isNotDeregisterPath && f.maxSlidingWindowSize > int(idleSocketNum)
}
func (f *WebsocketClientFarm) ToMap() map[string]int {
f.websocketClientFarmInfoMap = make(map[string]int)
f.connectorSockets.Range(func(key, value interface{}) bool {
f.websocketClientFarmInfoMap[key.(string)] = int(*(value.(*int32)))
return true
})
return f.websocketClientFarmInfoMap
}
type ConnectionInfo struct {
routerURI *url.URL
connIndex int
curConnAttempts *int64
}
func NewConnectionInfo(routerURI *url.URL, connIndex int) *ConnectionInfo {
return &ConnectionInfo{
routerURI: routerURI,
connIndex: connIndex,
curConnAttempts: new(int64),
}
}
func (ci *ConnectionInfo) OnConnectedSuccessfully() {
atomic.StoreInt64(ci.curConnAttempts, 0)
}
func (ci *ConnectionInfo) OnConnectionStarting() {
atomic.AddInt64(ci.curConnAttempts, 1)
}
func (ci *ConnectionInfo) RetryAfterMillis() int64 {
return int64(500 * math.Min(10000, math.Pow(2, float64(atomic.LoadInt64(ci.curConnAttempts)))))
}
func (ci *ConnectionInfo) String() string {
return "ConnectionInfo{" +
"routerURI=" + ci.routerURI.String() +
", connIndex=" + strconv.Itoa(ci.connIndex) +
fmt.Sprintf(", curConnAttempts=%d", atomic.LoadInt64(ci.curConnAttempts)) +
"}"
}