Skip to content

Commit

Permalink
feat: enable edge-level kill switch to drop messages when buffer is f…
Browse files Browse the repository at this point in the history
…ull, for the non-reduce forwarder (#634)

Introduce a new pipeline edge-level specification to define write behaviours when a buffer is full.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Mar 28, 2023
1 parent c03e54f commit 927b95c
Show file tree
Hide file tree
Showing 41 changed files with 1,037 additions and 430 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ jobs:
timeout-minutes: 20
strategy:
fail-fast: false
max-parallel: 6
max-parallel: 7
matrix:
driver: [jetstream]
case: [e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e]
case: [e2e-suite-1, e2e-suite-2, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ test-coverage-with-isb:
test-code:
go test -tags=isb_redis -race -v $(shell go list ./... | grep -v /vendor/ | grep -v /numaflow/test/) -timeout 120s

test-e2e:
test-e2e-suite-1:
test-e2e-suite-2:
test-kafka-e2e:
test-http-e2e:
test-nats-e2e:
Expand Down Expand Up @@ -135,7 +136,7 @@ Test%:
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow
kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true
cat test/manifests/e2e-api-pod.yaml | sed '[email protected]/numaproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:$(BASE_VERSION)/:$(VERSION)/' | kubectl -n numaflow-system apply -f -
-go test -v -timeout 10m -count 1 --tags test -p 1 ./test/e2e -run='.*/$*'
-go test -v -timeout 10m -count 1 --tags test -p 1 ./test/e2e-suite-1 -run='.*/$*'
$(MAKE) cleanup-e2e

.PHONY: ui-build
Expand Down
6 changes: 5 additions & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -17664,8 +17664,12 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.EdgeLimits",
"description": "Limits define the limitations such as buffer read batch size for the edge, will override pipeline level settings."
},
"onFull": {
"description": "OnFull specifies the behaviour for the write actions when the inter step buffer is full. There are currently two options, retryUntilSuccess and discardLatest. if not provided, the default value is set to \"retryUntilSuccess\"",
"type": "string"
},
"parallelism": {
"description": "Parallelism is only effective when the \"to\" vertex is a reduce vertex, if it's provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"description": "Parallelism is only effective when the \"to\" vertex is a reduce vertex, if it's not provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"format": "int32",
"type": "integer"
},
Expand Down
6 changes: 5 additions & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -17673,8 +17673,12 @@
"description": "Limits define the limitations such as buffer read batch size for the edge, will override pipeline level settings.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.EdgeLimits"
},
"onFull": {
"description": "OnFull specifies the behaviour for the write actions when the inter step buffer is full. There are currently two options, retryUntilSuccess and discardLatest. if not provided, the default value is set to \"retryUntilSuccess\"",
"type": "string"
},
"parallelism": {
"description": "Parallelism is only effective when the \"to\" vertex is a reduce vertex, if it's provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"description": "Parallelism is only effective when the \"to\" vertex is a reduce vertex, if it's not provided, the default value is set to \"1\". Parallelism is ignored when the \"to\" vertex is not a reduce vertex.",
"type": "integer",
"format": "int32"
},
Expand Down
5 changes: 5 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down
10 changes: 10 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down Expand Up @@ -2802,6 +2807,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down
15 changes: 15 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2522,6 +2522,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down Expand Up @@ -8438,6 +8443,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down Expand Up @@ -10628,6 +10638,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down
15 changes: 15 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2522,6 +2522,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down Expand Up @@ -8438,6 +8443,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down Expand Up @@ -10628,6 +10638,11 @@ spec:
format: int32
type: integer
type: object
onFull:
enum:
- retryUntilSuccess
- discardLatest
type: string
parallelism:
format: int32
type: integer
Expand Down
28 changes: 27 additions & 1 deletion docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,16 @@ Description
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.BufferFullWritingStrategy">
BufferFullWritingStrategy (<code>string</code> alias)
</p>
</h3>
<p>
(<em>Appears on:</em>
<a href="#numaflow.numaproj.io/v1alpha1.Edge">Edge</a>)
</p>
<p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.BufferServiceConfig">
BufferServiceConfig
</h3>
Expand Down Expand Up @@ -921,11 +931,27 @@ edge, will override pipeline level settings.
<em>(Optional)</em>
<p>
Parallelism is only effective when the “to” vertex is a reduce vertex,
if it’s provided, the default value is set to “1”. Parallelism is
if it’s not provided, the default value is set to “1”. Parallelism is
ignored when the “to” vertex is not a reduce vertex.
</p>
</td>
</tr>
<tr>
<td>
<code>onFull</code></br> <em>
<a href="#numaflow.numaproj.io/v1alpha1.BufferFullWritingStrategy">
BufferFullWritingStrategy </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
OnFull specifies the behaviour for the write actions when the inter step
buffer is full. There are currently two options, retryUntilSuccess and
discardLatest. if not provided, the default value is set to
“retryUntilSuccess”
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.EdgeLimits">
Expand Down
27 changes: 26 additions & 1 deletion pkg/apis/numaflow/v1alpha1/edge_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ type Edge struct {
// +optional
Limits *EdgeLimits `json:"limits,omitempty" protobuf:"bytes,4,opt,name=limits"`
// Parallelism is only effective when the "to" vertex is a reduce vertex,
// if it's provided, the default value is set to "1".
// if it's not provided, the default value is set to "1".
// Parallelism is ignored when the "to" vertex is not a reduce vertex.
// +optional
Parallelism *int32 `json:"parallelism" protobuf:"bytes,5,opt,name=parallelism"`
// OnFull specifies the behaviour for the write actions when the inter step buffer is full.
// There are currently two options, retryUntilSuccess and discardLatest.
// if not provided, the default value is set to "retryUntilSuccess"
// +kubebuilder:validation:Enum=retryUntilSuccess;discardLatest
// +optional
OnFull *BufferFullWritingStrategy `json:"onFull,omitempty" protobuf:"bytes,6,opt,name=onFull"`
}

type ForwardConditions struct {
Expand All @@ -46,3 +52,22 @@ type EdgeLimits struct {
// +optional
BufferUsageLimit *uint32 `json:"bufferUsageLimit,omitempty" protobuf:"varint,2,opt,name=bufferUsageLimit"`
}

func (e Edge) BufferFullWritingStrategy() BufferFullWritingStrategy {
if e.OnFull == nil {
return RetryUntilSuccess
}
switch *e.OnFull {
case RetryUntilSuccess, DiscardLatest:
return *e.OnFull
default:
return RetryUntilSuccess
}
}

type BufferFullWritingStrategy string

const (
RetryUntilSuccess BufferFullWritingStrategy = "retryUntilSuccess"
DiscardLatest BufferFullWritingStrategy = "discardLatest"
)
Loading

0 comments on commit 927b95c

Please sign in to comment.