Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable edge-level kill switch to drop messages when buffer is full, for the non-reduce forwarder #634

Merged
merged 7 commits into from
Mar 28, 2023

Conversation

KeranYang
Copy link
Member

@KeranYang KeranYang commented Mar 24, 2023

I will create follow up PRs for reduce forwarder once this one gets approved.

@KeranYang KeranYang marked this pull request as ready for review March 24, 2023 21:18
Makefile Show resolved Hide resolved
pkg/apis/numaflow/v1alpha1/edge_types.go Outdated Show resolved Hide resolved
pkg/forward/forward.go Outdated Show resolved Hide resolved
pkg/forward/forward.go Outdated Show resolved Hide resolved
pkg/forward/forward.go Show resolved Hide resolved
pkg/forward/forward.go Show resolved Hide resolved
pkg/isb/interfaces.go Outdated Show resolved Hide resolved
@@ -78,7 +78,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchW
return nil, fmt.Errorf("failed to create gRPC client, %w", err)
}
s.udsink = udsink
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: s}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: s}, forward.All, map[string]string{vertex.GetToBuffers()[0].Name: dfv1.RetryUntilSuccess}, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to make the new arg an option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by moving the OnFullWritingStrategy to the writer itself. Was able to bring changed files from 53 to 42.

pkg/apis/numaflow/v1alpha1/const.go Outdated Show resolved Hide resolved
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang requested a review from whynowy March 27, 2023 22:13
@KeranYang
Copy link
Member Author

Next Step

  • Apply same logic to reduce.
  • Add doc.

@@ -122,6 +125,10 @@ const (
DefaultBufferUsageLimit = 0.8
DefaultReadBatchSize = 500

// Edge-level specification to define the writing behaviour when the ToBuffer is full
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - These definitions could stay in the edge_types.go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

pkg/apis/numaflow/v1alpha1/edge_types.go Show resolved Hide resolved
pkg/forward/forward.go Outdated Show resolved Hide resolved
pkg/apis/numaflow/v1alpha1/edge_types.go Outdated Show resolved Hide resolved
pkg/apis/numaflow/v1alpha1/const.go Outdated Show resolved Hide resolved
@@ -24,6 +24,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)

type BufferFullWritingStrategy string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to edge_types.go?

for idx, msg := range messages {
if err := errs[idx]; err != nil {
// ATM there are no user defined errors during write, all are InternalErrors.
if ok := errors.As(err, &isb.NoRetryableBufferWriteErr{}); ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor - removing ok?

.
Signed-off-by: Keran Yang <[email protected]>
@@ -122,6 +122,9 @@ const (
DefaultBufferUsageLimit = 0.8
DefaultReadBatchSize = 500

RetryUntilSuccess BufferFullWritingStrategy = "retryUntilSuccess"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these as well?

.
Signed-off-by: Keran Yang <[email protected]>
@KeranYang KeranYang requested a review from whynowy March 28, 2023 19:01
Copy link
Member

@whynowy whynowy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@KeranYang KeranYang merged commit 927b95c into numaproj:main Mar 28, 2023
whynowy pushed a commit that referenced this pull request Apr 3, 2023
…ull, for the non-reduce forwarder (#634)

Introduce a new pipeline edge-level specification to define write behaviours when a buffer is full.

Signed-off-by: Keran Yang <[email protected]>
@vigith vigith linked an issue Apr 12, 2023 that may be closed by this pull request
@KeranYang KeranYang deleted the map-on-full branch August 24, 2023 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kill switch to drop messages when buffer.isFull == true
2 participants