Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable edge-level kill switch to drop messages when buffer is full, for the non-reduce forwarder #634

Merged
merged 7 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Mar 28, 2023
commit 37abe9b75ebc4b2d76dd89b7c7dd9d8988a59975
24 changes: 13 additions & 11 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,16 @@ Description
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.BufferFullWritingStrategy">
BufferFullWritingStrategy (<code>string</code> alias)
</p>
</h3>
<p>
(<em>Appears on:</em>
<a href="#numaflow.numaproj.io/v1alpha1.Edge">Edge</a>)
</p>
<p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.BufferServiceConfig">
BufferServiceConfig
</h3>
Expand Down Expand Up @@ -918,7 +928,9 @@ ignored when the “to” vertex is not a reduce vertex.
</tr>
<tr>
<td>
<code>onFull</code></br> <em> string </em>
<code>onFull</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.BufferFullWritingStrategy">
BufferFullWritingStrategy </a> </em>
</td>
<td>
<em>(Optional)</em>
Expand Down Expand Up @@ -2719,16 +2731,6 @@ Auth information
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.OnFullWritingStrategy">
OnFullWritingStrategy (<code>string</code> alias)
</p>
</h3>
<p>
<p>
OnFullWritingStrategy is an edge-level specification to define the
writing behaviour when the ToBuffer is full
</p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.PBQStorage">
PBQStorage
</h3>
Expand Down
8 changes: 3 additions & 5 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)

// OnFullWritingStrategy is an edge-level specification to define the writing behaviour when the ToBuffer is full
type OnFullWritingStrategy string
type BufferFullWritingStrategy string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to edge_types.go?


const (
Project = "numaflow"
Expand Down Expand Up @@ -125,9 +124,8 @@ const (
DefaultBufferUsageLimit = 0.8
DefaultReadBatchSize = 500

// Edge-level specification to define the writing behaviour when the ToBuffer is full
RetryUntilSuccess OnFullWritingStrategy = "retryUntilSuccess"
DiscardLatest OnFullWritingStrategy = "discardLatest"
RetryUntilSuccess BufferFullWritingStrategy = "retryUntilSuccess"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these as well?

DiscardLatest BufferFullWritingStrategy = "discardLatest"

// Auto scaling
DefaultLookbackSeconds = 180 // Default lookback seconds for calculating avg rate and pending
Expand Down
10 changes: 4 additions & 6 deletions pkg/apis/numaflow/v1alpha1/edge_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Edge struct {
// if not provided, the default value is set to "retryUntilSuccess"
// +kubebuilder:validation:Enum=retryUntilSuccess;discardLatest
// +optional
OnFull *string `json:"onFull,omitempty" protobuf:"bytes,6,opt,name=onFull"`
OnFull *BufferFullWritingStrategy `json:"onFull,omitempty" protobuf:"bytes,6,opt,name=onFull"`
}

type ForwardConditions struct {
Expand All @@ -53,15 +53,13 @@ type EdgeLimits struct {
BufferUsageLimit *uint32 `json:"bufferUsageLimit,omitempty" protobuf:"varint,2,opt,name=bufferUsageLimit"`
}

func (e Edge) OnFullWritingStrategy() OnFullWritingStrategy {
func (e Edge) BufferFullWritingStrategy() BufferFullWritingStrategy {
if e.OnFull == nil {
return RetryUntilSuccess
}
switch *e.OnFull {
whynowy marked this conversation as resolved.
Show resolved Hide resolved
case "retryUntilSuccess":
return RetryUntilSuccess
case "discardLatest":
return DiscardLatest
case RetryUntilSuccess, DiscardLatest:
return *e.OnFull
default:
return RetryUntilSuccess
}
Expand Down
719 changes: 360 additions & 359 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package forward

import (
"context"
"errors"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -490,7 +491,7 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBuffer is
for idx, msg := range messages {
if err := errs[idx]; err != nil {
// ATM there are no user defined errors during write, all are InternalErrors.
if _, ok := err.(isb.NoRetryableBufferWriteErr); ok {
if ok := errors.As(err, &isb.NoRetryableBufferWriteErr{}); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - removing ok?

// If toBuffer returns us a NoRetryableBufferWriteErr, we drop the message.
dropBytes += float64(len(msg.Payload))
} else {
Expand Down
12 changes: 6 additions & 6 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ func TestSourceInterStepDataForward(t *testing.T) {
<-stopped
}

// TestWriteToBufferError_OnFullWritingStrategyIsRetryUntilSuccess explicitly tests the case of retrying failed messages
func TestWriteToBufferError_OnFullWritingStrategyIsRetryUntilSuccess(t *testing.T) {
// TestWriteToBufferError_BufferFullWritingStrategyIsRetryUntilSuccess explicitly tests the case of retrying failed messages
func TestWriteToBufferError_BufferFullWritingStrategyIsRetryUntilSuccess(t *testing.T) {
fromStep := simplebuffer.NewInMemoryBuffer("from", 25)
to1 := simplebuffer.NewInMemoryBuffer("to1", 10)
toSteps := map[string]isb.BufferWriter{
Expand Down Expand Up @@ -673,10 +673,10 @@ func TestWriteToBufferError_OnFullWritingStrategyIsRetryUntilSuccess(t *testing.
<-stopped
}

// TestWriteToBufferError_OnFullWritingStrategyIsDiscardLatest explicitly tests the case of dropping messages when buffer is full
func TestWriteToBufferError_OnFullWritingStrategyIsDiscardLatest(t *testing.T) {
// TestWriteToBufferError_BufferFullWritingStrategyIsDiscardLatest explicitly tests the case of dropping messages when buffer is full
func TestWriteToBufferError_BufferFullWritingStrategyIsDiscardLatest(t *testing.T) {
fromStep := simplebuffer.NewInMemoryBuffer("from", 25)
to1 := simplebuffer.NewInMemoryBuffer("to1", 10, simplebuffer.WithOnFullWritingStrategy(dfv1.DiscardLatest))
to1 := simplebuffer.NewInMemoryBuffer("to1", 10, simplebuffer.WithBufferFullWritingStrategy(dfv1.DiscardLatest))
toSteps := map[string]isb.BufferWriter{
"to1": to1,
}
Expand All @@ -702,7 +702,7 @@ func TestWriteToBufferError_OnFullWritingStrategyIsDiscardLatest(t *testing.T) {
messageToStep["to1"] = make([]isb.Message, 0)
messageToStep["to1"] = append(messageToStep["to1"], writeMessages[0:11]...)
_, err = f.writeToBuffers(ctx, messageToStep)
// although we are writing 11 messages to a buffer of size 10, since we specify actionOnFull as DiscardLatest,
// although we are writing 11 messages to a buffer of size 10, since we specify BufferFullWritingStrategy as DiscardLatest,
// the writeToBuffers() call should return no error.
assert.Nil(t, err)
// stop will cancel the contexts and therefore the forwarder stops without waiting
Expand Down
20 changes: 10 additions & 10 deletions pkg/isb/stores/jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ type writeOptions struct {
refreshInterval time.Duration
// useWriteInfoAsRate indicates whether to check the write sequence for rate calculation
useWriteInfoAsRate bool
// onFullWritingStrategy is the writing strategy when buffer is full
onFullWritingStrategy dfv1.OnFullWritingStrategy
// bufferFullWritingStrategy is the writing strategy when buffer is full
bufferFullWritingStrategy dfv1.BufferFullWritingStrategy
}

func defaultWriteOptions() *writeOptions {
return &writeOptions{
maxLength: dfv1.DefaultBufferLength,
bufferUsageLimit: dfv1.DefaultBufferUsageLimit,
refreshInterval: 1 * time.Second,
useWriteInfoAsRate: false,
onFullWritingStrategy: dfv1.RetryUntilSuccess,
maxLength: dfv1.DefaultBufferLength,
bufferUsageLimit: dfv1.DefaultBufferUsageLimit,
refreshInterval: 1 * time.Second,
useWriteInfoAsRate: false,
bufferFullWritingStrategy: dfv1.RetryUntilSuccess,
}
}

Expand Down Expand Up @@ -80,10 +80,10 @@ func WithUsingWriteInfoAsRate(yes bool) WriteOption {
}
}

// WithOnFullWritingStrategy sets the writing strategy when buffer is full
func WithOnFullWritingStrategy(s dfv1.OnFullWritingStrategy) WriteOption {
// WithBufferFullWritingStrategy sets the writing strategy when buffer is full
func WithBufferFullWritingStrategy(s dfv1.BufferFullWritingStrategy) WriteOption {
return func(o *writeOptions) error {
o.onFullWritingStrategy = s
o.bufferFullWritingStrategy = s
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (jw *jetStreamWriter) Write(ctx context.Context, messages []isb.Message) ([
jw.log.Debugw("Is full")
isbFull.With(map[string]string{"buffer": jw.GetName()}).Inc()
// when buffer is full, we need to decide whether to discard the message or not.
switch jw.opts.onFullWritingStrategy {
switch jw.opts.bufferFullWritingStrategy {
case v1alpha1.DiscardLatest:
// user explicitly wants to discard the message when buffer if full.
// return no retryable error as a callback to let caller know that the message is discarded.
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) {
addStream(t, js, streamName)
defer deleteStream(js, streamName)

bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName, WithMaxLength(10), WithBufferUsageLimit(0.2), WithOnFullWritingStrategy(dfv1.DiscardLatest))
bw, err := NewJetStreamBufferWriter(ctx, defaultJetStreamClient, streamName, streamName, streamName, WithMaxLength(10), WithBufferUsageLimit(0.2), WithBufferFullWritingStrategy(dfv1.DiscardLatest))
assert.NoError(t, err)
jw, _ := bw.(*jetStreamWriter)
timeout := time.After(10 * time.Second)
Expand Down
18 changes: 9 additions & 9 deletions pkg/isb/stores/redis/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type options struct {
bufferUsageLimit float64
// refreshBufferWriteInfo is used to determine if we refresh buffer write info
refreshBufferWriteInfo bool
// onFullWritingStrategy is the writing strategy when buffer is full
onFullWritingStrategy dfv1.OnFullWritingStrategy
// bufferFullWritingStrategy is the writing strategy when buffer is full
bufferFullWritingStrategy dfv1.BufferFullWritingStrategy
}

// Option to apply different options
Expand Down Expand Up @@ -145,14 +145,14 @@ func WithRefreshBufferWriteInfo(r bool) Option {
return refreshBufferWriteInfo(r)
}

// WithOnFullWritingStrategy option
type onFullWritingStrategy dfv1.OnFullWritingStrategy
// WithBufferFullWritingStrategy option
type bufferFullWritingStrategy dfv1.BufferFullWritingStrategy

func (s onFullWritingStrategy) apply(o *options) {
o.onFullWritingStrategy = dfv1.OnFullWritingStrategy(s)
func (s bufferFullWritingStrategy) apply(o *options) {
o.bufferFullWritingStrategy = dfv1.BufferFullWritingStrategy(s)
}

// WithOnFullWritingStrategy sets the OnFullWritingStrategy
func WithOnFullWritingStrategy(s dfv1.OnFullWritingStrategy) Option {
return onFullWritingStrategy(s)
// WithBufferFullWritingStrategy sets the BufferFullWritingStrategy
func WithBufferFullWritingStrategy(s dfv1.BufferFullWritingStrategy) Option {
return bufferFullWritingStrategy(s)
}
16 changes: 8 additions & 8 deletions pkg/isb/stores/redis/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ var _ isb.BufferWriter = (*BufferWrite)(nil)
// NewBufferWrite returns a new redis queue writer.
func NewBufferWrite(ctx context.Context, client *redisclient.RedisClient, name string, group string, opts ...Option) isb.BufferWriter {
options := &options{
pipelining: true,
infoRefreshInterval: time.Second,
lagDuration: time.Duration(0),
maxLength: dfv1.DefaultBufferLength,
bufferUsageLimit: dfv1.DefaultBufferUsageLimit,
refreshBufferWriteInfo: true,
onFullWritingStrategy: dfv1.RetryUntilSuccess,
pipelining: true,
infoRefreshInterval: time.Second,
lagDuration: time.Duration(0),
maxLength: dfv1.DefaultBufferLength,
bufferUsageLimit: dfv1.DefaultBufferUsageLimit,
refreshBufferWriteInfo: true,
bufferFullWritingStrategy: dfv1.RetryUntilSuccess,
}

for _, o := range opts {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.O
isbIsFull.With(labels).Inc()

// when buffer is full, we need to decide whether to discard the message or not.
switch bw.onFullWritingStrategy {
switch bw.bufferFullWritingStrategy {
case dfv1.DiscardLatest:
// user explicitly wants to discard the message when buffer if full.
// return no retryable error as a callback to let caller know that the message is discarded.
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ func TestRedisQWrite_WithInfoRefreshInterval(t *testing.T) {
}
}

func TestRedisQWrite_WithInfoRefreshInterval_DiscardMessageOnFull(t *testing.T) {
func TestRedisQWrite_WithInfoRefreshInterval_WithBufferFullWritingStrategyIsDiscardLatest(t *testing.T) {
client := redisclient.NewRedisClient(redisOptions)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
stream := "withInfoRefreshInterval"
count := int64(10)
group := "withInfoRefreshInterval-group"
rqw, _ := NewBufferWrite(ctx, client, stream, group, WithInfoRefreshInterval(2*time.Millisecond), WithLagDuration(2*time.Millisecond), WithMaxLength(10), WithOnFullWritingStrategy(dfv1.DiscardLatest)).(*BufferWrite)
rqw, _ := NewBufferWrite(ctx, client, stream, group, WithInfoRefreshInterval(2*time.Millisecond), WithLagDuration(2*time.Millisecond), WithMaxLength(10), WithBufferFullWritingStrategy(dfv1.DiscardLatest)).(*BufferWrite)
err := client.CreateStreamGroup(ctx, rqw.GetStreamName(), group, redisclient.ReadFromEarliest)
if err != nil {
t.Fatalf("error creating consumer group: %s", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/isb/stores/simplebuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type elem struct {
func NewInMemoryBuffer(name string, size int64, opts ...Option) *InMemoryBuffer {

bufferOptions := &options{
readTimeOut: time.Second, // default read time out
onFullWritingStrategy: v1alpha1.RetryUntilSuccess, // default on full writing strategy
readTimeOut: time.Second, // default read time out
bufferFullWritingStrategy: v1alpha1.RetryUntilSuccess, // default buffer full writing strategy
}

for _, o := range opts {
Expand Down Expand Up @@ -147,7 +147,7 @@ func (b *InMemoryBuffer) Write(_ context.Context, messages []isb.Message) ([]isb
// access buffer via lock
b.rwlock.Unlock()
} else {
switch b.options.onFullWritingStrategy {
switch b.options.bufferFullWritingStrategy {
case v1alpha1.DiscardLatest:
errs[idx] = isb.NoRetryableBufferWriteErr{Name: b.name, Message: "Buffer full!"}
default:
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func TestNewSimpleBuffer(t *testing.T) {
assert.Equal(t, true, sb.IsFull())
}

func TestNewSimpleBuffer_DiscardOnFull(t *testing.T) {
func TestNewSimpleBuffer_BufferFullWritingStrategyIsDiscard(t *testing.T) {
count := int64(3)
sb := NewInMemoryBuffer("test", 2, WithOnFullWritingStrategy(v1alpha1.DiscardLatest))
sb := NewInMemoryBuffer("test", 2, WithBufferFullWritingStrategy(v1alpha1.DiscardLatest))
ctx := context.Background()
assert.NotEmpty(t, sb.String())
assert.Equal(t, sb.IsEmpty(), true)
Expand Down
10 changes: 5 additions & 5 deletions pkg/isb/stores/simplebuffer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
type options struct {
// readTimeOut is the timeout needed for read timeout
readTimeOut time.Duration
// onFullWritingStrategy is the writing strategy when buffer is full
onFullWritingStrategy dfv1.OnFullWritingStrategy
// bufferFullWritingStrategy is the writing strategy when buffer is full
bufferFullWritingStrategy dfv1.BufferFullWritingStrategy
}

type Option func(options *options) error
Expand All @@ -40,10 +40,10 @@ func WithReadTimeOut(timeout time.Duration) Option {
}
}

// WithOnFullWritingStrategy sets the writing strategy when buffer is full
func WithOnFullWritingStrategy(s dfv1.OnFullWritingStrategy) Option {
// WithBufferFullWritingStrategy sets the writing strategy when buffer is full
func WithBufferFullWritingStrategy(s dfv1.BufferFullWritingStrategy) Option {
return func(o *options) error {
o.onFullWritingStrategy = s
o.bufferFullWritingStrategy = s
return nil
}
}
4 changes: 2 additions & 2 deletions pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
case dfv1.ISBSvcTypeRedis:
for _, e := range sp.VertexInstance.Vertex.Spec.ToEdges {
writeOpts := []redisisb.Option{
redisisb.WithOnFullWritingStrategy(e.OnFullWritingStrategy()),
redisisb.WithBufferFullWritingStrategy(e.BufferFullWritingStrategy()),
}
if x := e.Limits; x != nil && x.BufferMaxLength != nil {
writeOpts = append(writeOpts, redisisb.WithMaxLength(int64(*x.BufferMaxLength)))
Expand Down Expand Up @@ -102,7 +102,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
for _, e := range sp.VertexInstance.Vertex.Spec.ToEdges {
writeOpts := []jetstreamisb.WriteOption{
jetstreamisb.WithUsingWriteInfoAsRate(true),
jetstreamisb.WithOnFullWritingStrategy(e.OnFullWritingStrategy()),
jetstreamisb.WithBufferFullWritingStrategy(e.BufferFullWritingStrategy()),
}
if x := e.Limits; x != nil && x.BufferMaxLength != nil {
writeOpts = append(writeOpts, jetstreamisb.WithMaxLength(int64(*x.BufferMaxLength)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/udf/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func buildRedisBufferIO(ctx context.Context, fromBufferName string, vertexInstan
for _, e := range vertexInstance.Vertex.Spec.ToEdges {

writeOpts := []redisisb.Option{
redisisb.WithOnFullWritingStrategy(e.OnFullWritingStrategy()),
redisisb.WithBufferFullWritingStrategy(e.BufferFullWritingStrategy()),
}
if x := e.Limits; x != nil && x.BufferMaxLength != nil {
writeOpts = append(writeOpts, redisisb.WithMaxLength(int64(*x.BufferMaxLength)))
Expand Down Expand Up @@ -76,7 +76,7 @@ func buildJetStreamBufferIO(ctx context.Context, fromBufferName string, vertexIn

for _, e := range vertexInstance.Vertex.Spec.ToEdges {
writeOpts := []jetstreamisb.WriteOption{
jetstreamisb.WithOnFullWritingStrategy(e.OnFullWritingStrategy()),
jetstreamisb.WithBufferFullWritingStrategy(e.BufferFullWritingStrategy()),
}
if x := e.Limits; x != nil && x.BufferMaxLength != nil {
writeOpts = append(writeOpts, jetstreamisb.WithMaxLength(int64(*x.BufferMaxLength)))
Expand Down