-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enable edge-level kill switch to drop messages when buffer is full, for the non-reduce forwarder #634
Conversation
Signed-off-by: Keran Yang <[email protected]>
pkg/sinks/udsink/sink.go
Outdated
@@ -78,7 +78,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchW | |||
return nil, fmt.Errorf("failed to create gRPC client, %w", err) | |||
} | |||
s.udsink = udsink | |||
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: s}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...) | |||
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: s}, forward.All, map[string]string{vertex.GetToBuffers()[0].Name: dfv1.RetryUntilSuccess}, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to make the new arg an option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed by moving the OnFullWritingStrategy to the writer itself. Was able to bring changed files from 53 to 42.
Signed-off-by: Keran Yang <[email protected]>
Next Step
|
pkg/apis/numaflow/v1alpha1/const.go
Outdated
@@ -122,6 +125,10 @@ const ( | |||
DefaultBufferUsageLimit = 0.8 | |||
DefaultReadBatchSize = 500 | |||
|
|||
// Edge-level specification to define the writing behaviour when the ToBuffer is full |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor - These definitions could stay in the edge_types.go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
pkg/apis/numaflow/v1alpha1/const.go
Outdated
@@ -24,6 +24,8 @@ import ( | |||
"k8s.io/apimachinery/pkg/api/resource" | |||
) | |||
|
|||
type BufferFullWritingStrategy string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to edge_types.go?
pkg/forward/forward.go
Outdated
for idx, msg := range messages { | ||
if err := errs[idx]; err != nil { | ||
// ATM there are no user defined errors during write, all are InternalErrors. | ||
if ok := errors.As(err, &isb.NoRetryableBufferWriteErr{}); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor - removing ok
?
Signed-off-by: Keran Yang <[email protected]>
pkg/apis/numaflow/v1alpha1/const.go
Outdated
@@ -122,6 +122,9 @@ const ( | |||
DefaultBufferUsageLimit = 0.8 | |||
DefaultReadBatchSize = 500 | |||
|
|||
RetryUntilSuccess BufferFullWritingStrategy = "retryUntilSuccess" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these as well?
Signed-off-by: Keran Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ull, for the non-reduce forwarder (#634) Introduce a new pipeline edge-level specification to define write behaviours when a buffer is full. Signed-off-by: Keran Yang <[email protected]>
map
part of issue kill switch to drop messages when buffer.isFull == true #524I will create follow up PRs for reduce forwarder once this one gets approved.