Skip to content

Commit

Permalink
feat: Distributed storage (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
creativecreature committed Jun 7, 2024
1 parent 72ecfa5 commit 0b2b9df
Show file tree
Hide file tree
Showing 37 changed files with 1,960 additions and 363 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ linters-settings:
gocognit:
# Minimal code complexity to report.
# Default: 30 (but we recommend 10-20)
min-complexity: 60
min-complexity: 70

gocritic:
# Settings passed to gocritic.
Expand Down
348 changes: 235 additions & 113 deletions README.md

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func BenchmarkGetConcurrent(b *testing.B) {
numShards := 100
ttl := time.Hour
evictionPercentage := 5
c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage)
c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
)
c.Set(cacheKey, "value")

metrics := make(benchmarkMetrics[string], 0)
Expand All @@ -77,7 +79,9 @@ func BenchmarkSetConcurrent(b *testing.B) {
numShards := 10_000
ttl := time.Hour
evictionPercentage := 5
c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage)
c := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithNoContinuousEvictions(),
)

metrics := make(benchmarkMetrics[string], 0)
b.ResetTimer()
Expand Down
70 changes: 38 additions & 32 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

Expand All @@ -47,7 +48,7 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {

fetchObserver := NewFetchObserver(1)
fetchObserver.BatchResponse(ids)
sturdyc.GetFetchBatch(ctx, client, ids, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, ids, client.BatchKeyFn("item"), fetchObserver.FetchBatch)

<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 1)
Expand All @@ -59,11 +60,11 @@ func TestBatchIsRefreshedWhenTheTimeoutExpires(t *testing.T) {
clock.Add(maxRefreshDelay + time.Second)

// We'll create a batch function that stores the ids it was called with, and
// then invoke "GetFetchBatch". We are going to request 3 ids, which is less
// then invoke "GetOrFetchBatch". We are going to request 3 ids, which is less
// than our wanted batch size. This should lead to a batch being scheduled.
recordsToRequest := []string{"1", "2", "3"}
fetchObserver.BatchResponse(recordsToRequest)
sturdyc.GetFetchBatch(ctx, client, recordsToRequest, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, recordsToRequest, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
time.Sleep(10 * time.Millisecond)
fetchObserver.AssertFetchCount(t, 1)

Expand Down Expand Up @@ -98,9 +99,10 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

Expand All @@ -111,7 +113,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {

fetchObserver := NewFetchObserver(1)
fetchObserver.BatchResponse(ids)
sturdyc.GetFetchBatch(ctx, client, ids, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, ids, client.BatchKeyFn("item"), fetchObserver.FetchBatch)

<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 1)
Expand All @@ -123,17 +125,17 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
clock.Add(maxRefreshDelay + time.Second)

// We'll create a batch function that stores the ids it was called with, and
// then invoke "GetFetchBatch". We are going to request 3 ids, which is less
// then invoke "GetOrFetchBatch". We are going to request 3 ids, which is less
// than our ideal batch size. This should lead to a batch being scheduled.
firstBatchOfRequestedRecords := []string{"1", "2", "3"}
fetchObserver.BatchResponse([]string{"1", "2", "3"})
sturdyc.GetFetchBatch(ctx, client, firstBatchOfRequestedRecords, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, firstBatchOfRequestedRecords, client.BatchKeyFn("item"), fetchObserver.FetchBatch)

// Now, we'll move the clock forward 5 seconds before requesting another 3 records.
// Our wanted batch size is 10, hence this should NOT be enough to trigger a refresh.
clock.Add(5 * time.Second)
secondBatchOfRecords := []string{"4", "5", "6"}
sturdyc.GetFetchBatch(ctx, client, secondBatchOfRecords, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, secondBatchOfRecords, client.BatchKeyFn("item"), fetchObserver.FetchBatch)

// Move the clock another 10 seconds. Again, this should not trigger a refresh. We'll
// perform a sleep here too just to ensure that the buffer is not refreshed prematurely.
Expand All @@ -144,7 +146,7 @@ func TestBatchIsRefreshedWhenTheBufferSizeIsReached(t *testing.T) {
// In the the third batch I'm going to request 6 records. With that, we've
// requested 12 record in total, which is greater than our buffer size of 10.
thirdBatchOfRecords := []string{"7", "8", "9", "10", "11", "12"}
sturdyc.GetFetchBatch(ctx, client, thirdBatchOfRecords, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, thirdBatchOfRecords, client.BatchKeyFn("item"), fetchObserver.FetchBatch)

// An actual refresh should happen for the first 10 ids, while the 2 that
// overflows should get scheduled for a refresh. Block until the request has
Expand Down Expand Up @@ -191,9 +193,10 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

Expand All @@ -205,7 +208,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {

fetchObserver := NewFetchObserver(1)
fetchObserver.BatchResponse(ids)
sturdyc.GetFetchBatch(ctx, client, ids, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, ids, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.AssertRequestedRecords(t, ids)
Expand All @@ -221,7 +224,7 @@ func TestBatchIsNotRefreshedByDuplicates(t *testing.T) {
for i := 0; i < numRequests; i++ {
go func(i int) {
id := []string{strconv.Itoa((i % 3) + 1)}
sturdyc.GetFetchBatch(ctx, client, id, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, id, client.BatchKeyFn("item"), fetchObserver.FetchBatch)
wg.Done()
}(i)
}
Expand Down Expand Up @@ -260,9 +263,10 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
c := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

Expand All @@ -283,9 +287,9 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {

fetchObserver := NewFetchObserver(1)
fetchObserver.BatchResponse(seedIDs)
sturdyc.GetFetchBatch(ctx, c, seedIDs, c.PermutatedBatchKeyFn(prefix, optsOne), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, c, seedIDs, c.PermutatedBatchKeyFn(prefix, optsOne), fetchObserver.FetchBatch)
<-fetchObserver.FetchCompleted
sturdyc.GetFetchBatch(ctx, c, seedIDs, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, c, seedIDs, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)
<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 2)
fetchObserver.Clear()
Expand All @@ -304,12 +308,12 @@ func TestBatchesAreGroupedByPermutations(t *testing.T) {
optsTwoBatch2 := []string{"6", "7", "8"}

// Request the first batch of records. This should wait for additional IDs.
sturdyc.GetFetchBatch(ctx, c, optsOneIDs, c.PermutatedBatchKeyFn(prefix, optsOne), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, c, optsOneIDs, c.PermutatedBatchKeyFn(prefix, optsOne), fetchObserver.FetchBatch)

// Next, we're requesting ids 4-8 with the second options which should exceed the buffer size for that permutation.
fetchObserver.BatchResponse([]string{"4", "5", "6", "7", "8"})
sturdyc.GetFetchBatch(ctx, c, optsTwoBatch1, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)
sturdyc.GetFetchBatch(ctx, c, optsTwoBatch2, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, c, optsTwoBatch1, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, c, optsTwoBatch2, c.PermutatedBatchKeyFn(prefix, optsTwo), fetchObserver.FetchBatch)

<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 3)
Expand Down Expand Up @@ -348,9 +352,10 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[string](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

Expand All @@ -363,7 +368,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {

fetchObserver := NewFetchObserver(5)
fetchObserver.BatchResponse(seedIDs)
sturdyc.GetFetchBatch(ctx, client, seedIDs, client.BatchKeyFn(cacheKeyPrefix), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, seedIDs, client.BatchKeyFn(cacheKeyPrefix), fetchObserver.FetchBatch)
<-fetchObserver.FetchCompleted
fetchObserver.AssertFetchCount(t, 1)
fetchObserver.AssertRequestedRecords(t, seedIDs)
Expand All @@ -379,7 +384,7 @@ func TestLargeBatchesAreChunkedCorrectly(t *testing.T) {
for i := 1; i <= 50; i++ {
largeBatch = append(largeBatch, strconv.Itoa(i))
}
sturdyc.GetFetchBatch(ctx, client, largeBatch, client.BatchKeyFn(cacheKeyPrefix), fetchObserver.FetchBatch)
sturdyc.GetOrFetchBatch(ctx, client, largeBatch, client.BatchKeyFn(cacheKeyPrefix), fetchObserver.FetchBatch)
for i := 0; i < 10; i++ {
<-fetchObserver.FetchCompleted
}
Expand Down Expand Up @@ -409,9 +414,10 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) {
// 1. The number of scheduled refreshes exceeds the specified 'batchSize'.
// 2. The 'batchBufferTimeout' threshold is exceeded.
client := sturdyc.New[any](capacity, numShards, ttl, evictionPercentage,
sturdyc.WithBackgroundRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithEarlyRefreshes(minRefreshDelay, maxRefreshDelay, refreshRetryInterval),
sturdyc.WithMissingRecordStorage(),
sturdyc.WithRefreshBuffering(batchSize, batchBufferTimeout),
sturdyc.WithRefreshCoalescing(batchSize, batchBufferTimeout),
sturdyc.WithClock(clock),
)

Expand All @@ -420,7 +426,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) {
}

records := []string{"1", "2", "3"}
res, _ := sturdyc.GetFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
res, _ := sturdyc.GetOrFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
values := make(map[string]Foo, len(ids))
for _, id := range ids {
values[id] = Foo{Value: "foo-" + id}
Expand All @@ -433,7 +439,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) {
}

clock.Add(time.Minute * 45)
sturdyc.GetFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
sturdyc.GetOrFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
values := make(map[string]Foo, len(ids))
for _, id := range ids {
values[id] = Foo{Value: "foo2-" + id}
Expand All @@ -450,7 +456,7 @@ func TestValuesAreUpdatedCorrectly(t *testing.T) {
clock.Add(time.Minute * 5)
time.Sleep(50 * time.Millisecond)

resTwo, _ := sturdyc.GetFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
resTwo, _ := sturdyc.GetOrFetchBatch[Foo](ctx, client, records, client.BatchKeyFn("item"), func(_ context.Context, ids []string) (map[string]Foo, error) {
values := make(map[string]Foo, len(ids))
for _, id := range ids {
values[id] = Foo{Value: "foo3-" + id}
Expand Down
55 changes: 17 additions & 38 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,6 @@ import (
"github.com/cespare/xxhash"
)

type MetricsRecorder interface {
CacheHit()
CacheMiss()
Eviction()
ForcedEviction()
EntriesEvicted(int)
ShardIndex(int)
CacheBatchRefreshSize(size int)
ObserveCacheSize(callback func() int)
}

// FetchFn Fetch represents a function that can be used to fetch a single record from a data source.
type FetchFn[T any] func(ctx context.Context) (T, error)

Expand All @@ -34,10 +23,11 @@ type KeyFn func(id string) string

// Config represents the configuration that can be applied to the cache.
type Config struct {
clock Clock
evictionInterval time.Duration
metricsRecorder MetricsRecorder
log Logger
clock Clock
evictionInterval time.Duration
disableContinuousEvictions bool
metricsRecorder DistributedMetricsRecorder
log Logger

refreshInBackground bool
minRefreshTime time.Duration
Expand All @@ -54,6 +44,10 @@ type Config struct {
useRelativeTimeKeyFormat bool
keyTruncation time.Duration
getSize func() int

distributedStorage DistributedStorageWithDeletions
distributedEarlyRefreshes bool
distributedRefreshAfterDuration time.Duration
}

// Client represents a cache client that can be used to store and retrieve values.
Expand Down Expand Up @@ -103,20 +97,19 @@ func New[T any](capacity, numShards int, ttl time.Duration, evictionPercentage i
client.nextShard = 0

// Run evictions on the shards in a separate goroutine.
client.startEvictions()
if !cfg.disableContinuousEvictions {
client.performContinuousEvictions()
}

return client
}

// startEvictions is going to be running in a separate goroutine that we're going to prevent from ever exiting.
func (c *Client[T]) startEvictions() {
// performContinuousEvictions is going to be running in a separate goroutine that we're going to prevent from ever exiting.
func (c *Client[T]) performContinuousEvictions() {
go func() {
ticker, stop := c.clock.NewTicker(c.evictionInterval)
defer stop()
for range ticker {
if c.metricsRecorder != nil {
c.metricsRecorder.Eviction()
}
c.shards[c.nextShard].evictExpired()
c.nextShard = (c.nextShard + 1) % len(c.shards)
}
Expand All @@ -127,24 +120,10 @@ func (c *Client[T]) startEvictions() {
func (c *Client[T]) getShard(key string) *shard[T] {
hash := xxhash.Sum64String(key)
shardIndex := hash % uint64(len(c.shards))
if c.metricsRecorder != nil {
c.metricsRecorder.ShardIndex(int(shardIndex))
}
c.reportShardIndex(int(shardIndex))
return c.shards[shardIndex]
}

// reportCacheHits is used to report cache hits and misses to the metrics recorder.
func (c *Client[T]) reportCacheHits(cacheHit bool) {
if c.metricsRecorder == nil {
return
}
if !cacheHit {
c.metricsRecorder.CacheMiss()
return
}
c.metricsRecorder.CacheHit()
}

func (c *Client[T]) get(key string) (value T, exists, ignore, refresh bool) {
shard := c.getShard(key)
val, exists, ignore, refresh := shard.get(key)
Expand All @@ -171,7 +150,7 @@ func (c *Client[T]) GetMany(keys []string) map[string]T {
return records
}

// GetManyKeyFn follows the same API as GetFetchBatch and PassthroughBatch.
// GetManyKeyFn follows the same API as GetOrFetchBatch and PassthroughBatch.
// You provide it with a slice of IDs and a keyFn, which is applied to create
// the cache key. The returned map uses the IDs as keys instead of the cache key.
// If you've used ScanKeys to retrieve the actual keys, you can retrieve the records
Expand Down Expand Up @@ -211,7 +190,7 @@ func (c *Client[T]) SetMany(records map[string]T) bool {
return triggeredEviction
}

// SetManyKeyFn follows the same API as GetFetchBatch and PassThroughBatch. It
// SetManyKeyFn follows the same API as GetOrFetchBatch and PassThroughBatch. It
// takes a map of records where the keyFn is applied to each key in the map
// before it's stored in the cache.
func (c *Client[T]) SetManyKeyFn(records map[string]T, cacheKeyFn KeyFn) bool {
Expand Down
6 changes: 5 additions & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func TestShardDistribution(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
recorder := newTestMetricsRecorder(tc.numShards)
c := sturdyc.New[string](tc.capacity, tc.numShards, time.Hour, 5, sturdyc.WithMetrics(recorder))
c := sturdyc.New[string](tc.capacity, tc.numShards, time.Hour, 5,
sturdyc.WithNoContinuousEvictions(),
sturdyc.WithMetrics(recorder),
)
for i := 0; i < tc.capacity; i++ {
key := randKey(tc.keyLength)
c.Set(key, "value")
Expand Down Expand Up @@ -144,6 +147,7 @@ func TestForcedEvictions(t *testing.T) {
time.Hour,
tc.evictionPercentage,
sturdyc.WithMetrics(recorder),
sturdyc.WithNoContinuousEvictions(),
)

// Start by filling the sturdyc.
Expand Down
Loading

0 comments on commit 0b2b9df

Please sign in to comment.