Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Support Stateful SDFs (state and timers) #32139

Open
Tracked by #29650
lostluck opened this issue Aug 9, 2024 · 1 comment
Open
Tracked by #29650

[prism] Support Stateful SDFs (state and timers) #32139

lostluck opened this issue Aug 9, 2024 · 1 comment

Comments

@lostluck
Copy link
Contributor

lostluck commented Aug 9, 2024

Prism doesn't currently correctly support Stateful Splittable DoFns. That is, SDFs with State and Timers. This issue is to track whether this changes, and to outline the problem so an error message can link here.

The recommended work around is to split the SDF into two: An SDF, and a StatefulDoFn. Ideally the KV key into the SDF is different than what's into the Stateful DoFn, to prevent the implied fusion break of the StatefulDoFn from being less efficient. (eg. if the SDF is keyed by a partition or a filename, it shouldn't be emitting values with that as the key to the splittable DoFn. That limits parallelism anyway.)


The difficulty is that what we execute for an SDF is not based on the users's key due to the rewriting an SDF goes though. The executed KV for a ProcessSizedElementAndRestriction is a
KV(KV(element, restriction), float64), where element may be a user KV itself with what the user key dictates. (See https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go#L126)

I believe the user expectation would be to base state on their Key, and not the full Element+Restriction pair, or specifically the Key+Value+Restriction triple.

So the element manager would need to be aware of this for extracting the key for the element, and processing only one bundle for the key at a time, and also not "break" the key handling for state elsewhere.

lostluck added a commit to lostluck/beam that referenced this issue Aug 9, 2024
jrmccluskey pushed a commit that referenced this issue Aug 9, 2024
* [#32139] Fail pipelines with Stateful SDFs.

* rm debug print

---------

Co-authored-by: lostluck <[email protected]>
lostluck added a commit to lostluck/beam that referenced this issue Aug 12, 2024
* [apache#32139] Fail pipelines with Stateful SDFs.

* rm debug print

---------

Co-authored-by: lostluck <[email protected]>
@lostluck lostluck self-assigned this Aug 15, 2024
@lostluck
Copy link
Contributor Author

In discussion with Kenn, we came to the conclusion that while from a user affordance perspective, they are orthogonal, and thus compatible features, Kenn noted "But in some sense they are, in fact, diametrical opposites. State is all about non-parallelism, while splitting is all about parallelism." and I can't help but agree.

I now have a task to document the contraindication in the programming guide and protocol buffers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant