forked from square/quotaservice
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bucket.go
369 lines (305 loc) · 9.69 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
// Licensed under the Apache License, Version 2.0
// Details: https://raw.githubusercontent.com/square/quotaservice/master/LICENSE
package quotaservice
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/square/quotaservice/config"
"github.com/square/quotaservice/events"
"github.com/square/quotaservice/logging"
pbconfig "github.com/square/quotaservice/protos/config"
)
// bucketContainer is a holder for configurations and bucket factories.
type bucketContainer struct {
cfg *pbconfig.ServiceConfig
bf BucketFactory
n notifier
namespaces map[string]*namespace
defaultBucket Bucket
r *reaper
sync.RWMutex // Embedded mutex
}
type namespace struct {
n notifier
name string
cfg *pbconfig.NamespaceConfig
buckets map[string]Bucket
dynamicBucketCount int32
defaultBucket Bucket
sync.RWMutex // Embedded mutex
}
type notifier interface {
Emit(e events.Event)
}
// Bucket is an abstraction of a token bucket.
type Bucket interface {
// Take retrieves tokens from a token bucket, returning the time, in millis, to wait before
// the number of tokens becomes available. A return value of 0 would mean no waiting is
// necessary. Success is true if tokens can be obtained, false if cannot be obtained within
// the specified maximum wait time.
Take(numTokens int64, maxWaitTime time.Duration) (waitTime time.Duration, success bool)
Config() *pbconfig.BucketConfig
// Dynamic indicates whether a bucket is a dynamic one, or one that is statically defined in
// configuration.
Dynamic() bool
// Destroy indicates that a bucket has been removed from the BucketContainer, is no longer
// reachable, and should clean up any resources it may have open.
Destroy()
// ReportActivity indicates that an ActivityChannel is active. This method shouldn't block.
ReportActivity()
}
type DefaultBucket struct {
}
func (d DefaultBucket) Destroy() {
// no-op
}
func (d DefaultBucket) ReportActivity() {
// no-op
}
func (ns *namespace) removeBucket(bucketName string) {
// Remove this bucket.
ns.Lock()
defer ns.Unlock()
bucket := ns.buckets[bucketName]
if bucket != nil {
delete(ns.buckets, bucketName)
if bucket.Dynamic() {
ns.dynamicBucketCount--
}
ns.n.Emit(events.NewBucketRemovedEvent(ns.name, bucketName, bucket.Dynamic()))
bucket.Destroy()
}
}
// destroy calls Destroy() on all buckets in this namespace
func (ns *namespace) destroy() {
ns.Lock()
defer ns.Unlock()
if ns.defaultBucket != nil {
ns.defaultBucket.Destroy()
}
for _, bucket := range ns.buckets {
bucket.Destroy()
}
}
// swapCfg swaps the bucket config for the namespace.
func (ns *namespace) swapCfg(newCfg *pbconfig.NamespaceConfig) {
ns.Lock()
defer ns.Unlock()
ns.cfg = newCfg
}
// BucketFactory creates buckets.
type BucketFactory interface {
// Init initializes the bucket factory.
Init(cfg *pbconfig.ServiceConfig)
// NewBucket creates a new bucket.
NewBucket(namespace, bucketName string, cfg *pbconfig.BucketConfig, dyn bool) Bucket
// Client is an accessor to the underlying network client, if there is one.
Client() interface{}
}
// NewBucketContainer creates a new bucket container.
func NewBucketContainer(bf BucketFactory, n notifier, r config.ReaperConfig) (bc *bucketContainer) {
bc = &bucketContainer{
bf: bf,
n: n,
namespaces: make(map[string]*namespace)}
bc.r = newReaper(bc, r)
return
}
func (bc *bucketContainer) Init(cfg *pbconfig.ServiceConfig) {
bc.Lock()
defer bc.Unlock()
bc.initLocked(cfg)
}
func (bc *bucketContainer) initLocked(cfg *pbconfig.ServiceConfig) {
if bc.cfg != nil {
logging.Fatal("BucketContainer already has a config; cannot be re-initialized")
}
bc.cfg = cfg
if cfg.GlobalDefaultBucket != nil {
if bc.defaultBucket != nil {
logging.Fatal("Global default bucket already exists when initializing")
}
bc.createGlobalDefaultBucketLocked(cfg.GlobalDefaultBucket)
}
for name, nsCfg := range bc.cfg.Namespaces {
if nsCfg.Name == "" {
nsCfg.Name = name
}
bc.createNamespaceLocked(nsCfg)
}
}
func (bc *bucketContainer) createNamespaceLocked(nsCfg *pbconfig.NamespaceConfig) {
nsp := &namespace{n: bc.n, name: nsCfg.Name, cfg: nsCfg, buckets: make(map[string]Bucket)}
if nsCfg.DefaultBucket != nil {
nsp.defaultBucket = bc.bf.NewBucket(nsCfg.Name, config.DefaultBucketName, nsCfg.DefaultBucket, false)
}
nsp.Lock()
defer nsp.Unlock()
for bucketName, bucketCfg := range nsCfg.Buckets {
bc.createNewNamedBucketFromCfg(nsCfg.Name, bucketName, nsp, bucketCfg, false)
}
bc.namespaces[nsCfg.Name] = nsp
}
func (bc *bucketContainer) createGlobalDefaultBucketLocked(cfg *pbconfig.BucketConfig) {
bc.defaultBucket = bc.bf.NewBucket(config.GlobalNamespace, config.DefaultBucketName, cfg, false)
}
// FindBucket locates a bucket for a given name and namespace. If the namespace doesn't exist, and
// if a global default bucket is configured, it will be used. If the namespace is available but the
// named bucket doesn't exist, it will either use a namespace-scoped default bucket if available, or
// a dynamic bucket is created if enabled (and space for more dynamic buckets is available). If all
// fails, this function returns nil. This function is thread-safe, and may lazily create dynamic
// buckets or re-create statically defined buckets that have been invalidated.
func (bc *bucketContainer) FindBucket(namespace string, bucketName string) (Bucket, error) {
bc.RLock()
ns := bc.namespaces[namespace]
bc.RUnlock()
var bucket Bucket
var err error
reportActivity := true
if ns == nil {
// Namespace doesn't exist. Use default bucket if possible.
bucket = bc.defaultBucket
} else {
// Check if the precise bucket exists.
ns.RLock()
bucket = ns.buckets[bucketName]
ns.RUnlock()
if bucket == nil {
if ns.cfg.DynamicBucketTemplate != nil {
// Double-checked locking is safe in Golang, since acquiring locks (read or write)
// have the same effect as volatile in Java, causing a memory fence being crossed.
ns.Lock()
defer ns.Unlock()
// need to check if an instance has been created concurrently.
bucket = ns.buckets[bucketName]
if bucket == nil {
reportActivity = false // createNewNamedBucket will report activity
bucket = bc.createNewNamedBucket(namespace, bucketName, ns)
if bucket == nil {
err = errors.New("Cannot create dynamic bucket")
}
}
} else {
// Try a default for the namespace.
bucket = ns.defaultBucket
}
}
}
if bucket != nil && reportActivity {
bucket.ReportActivity()
}
return bucket, err
}
// createNewNamedBucket creates a new, named bucket. May return nil if the named bucket is dynamic,
// and the namespace has already reached its maxDynamicBuckets setting.
func (bc *bucketContainer) createNewNamedBucket(namespace, bucketName string, ns *namespace) Bucket {
bCfg := ns.cfg.Buckets[bucketName]
dyn := false
if bCfg == nil {
// Dynamic.
if ns.dynamicBucketCount >= ns.cfg.MaxDynamicBuckets && ns.cfg.MaxDynamicBuckets > 0 {
logging.Printf("Bucket %v:%v numDynamicBuckets=%v maxDynamicBuckets=%v. Not creating more dynamic buckets.",
namespace, bucketName, ns.dynamicBucketCount, ns.cfg.MaxDynamicBuckets)
return nil
}
dyn = true
bCfg = ns.cfg.DynamicBucketTemplate
}
return bc.createNewNamedBucketFromCfg(namespace, bucketName, ns, bCfg, dyn)
}
func (bc *bucketContainer) countDynamicBuckets(namespace string) int32 {
bc.RLock()
defer bc.RUnlock()
var c int32
for _, b := range bc.namespaces[namespace].buckets {
if b.Dynamic() {
c++
}
}
return c
}
func (bc *bucketContainer) createNewNamedBucketFromCfg(namespace, bucketName string, ns *namespace, bCfg *pbconfig.BucketConfig, dyn bool) Bucket {
bc.n.Emit(events.NewBucketCreatedEvent(namespace, bucketName, dyn))
var bucket Bucket
bucket = bc.bf.NewBucket(namespace, bucketName, bCfg, dyn)
if bucket == nil {
// TODO(manik) why would this ever happen? Should we panic?
return nil
}
bucket, _ = bc.r.applyWatch(bucket, namespace, bucketName, bCfg)
ns.buckets[bucketName] = bucket
if dyn {
ns.dynamicBucketCount++
}
bucket.ReportActivity()
return bucket
}
func (bc *bucketContainer) NamespaceExists(namespace string) bool {
bc.RLock()
defer bc.RUnlock()
_, exists := bc.namespaces[namespace]
return exists
}
func (bc *bucketContainer) Exists(namespace, name string) bool {
bc.RLock()
defer bc.RUnlock()
if ns, exists := bc.namespaces[namespace]; exists {
ns.RLock()
defer ns.RUnlock()
_, bucketExists := ns.buckets[name]
return bucketExists
}
return false
}
func (bc *bucketContainer) removeBucket(namespace, bucket string) bool {
bc.RLock()
ns := bc.namespaces[namespace]
bc.RUnlock()
if ns != nil {
ns.removeBucket(bucket)
return true
}
return false
}
func (bc *bucketContainer) String() string {
bc.RLock()
defer bc.RUnlock()
var buffer bytes.Buffer
if bc.defaultBucket != nil {
_, _ = buffer.WriteString("Global default present\n\n")
}
sortedNamespaces := make([]string, len(bc.namespaces))
i := 0
for nsName := range bc.namespaces {
sortedNamespaces[i] = nsName
i++
}
sort.Strings(sortedNamespaces)
for _, nsName := range sortedNamespaces {
ns := bc.namespaces[nsName]
_, _ = buffer.WriteString(fmt.Sprintf(" * Namespace: %v\n", nsName))
if ns.defaultBucket != nil {
_, _ = buffer.WriteString(" + Default present\n")
}
// Sort buckets
sortedBuckets := make([]string, len(ns.buckets))
j := 0
for bName := range ns.buckets {
sortedBuckets[j] = bName
j++
}
sort.Strings(sortedBuckets)
for _, bName := range sortedBuckets {
_, _ = buffer.WriteString(fmt.Sprintf(" + %v\n", bName))
}
_, _ = buffer.WriteString("\n")
}
return buffer.String()
}
func (bc *bucketContainer) Stop() {
bc.r.stop()
}