Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-379: Key_Shared Draining Hashes for Improved Message Ordering #23309

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Sep 14, 2024

Motivation

Read the pip-379.md document in this PR.

Mailing list discussion thread

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.0.0 milestone Sep 14, 2024
@lhotari lhotari self-assigned this Sep 14, 2024
@github-actions github-actions bot added the PIP label Sep 14, 2024
@lhotari lhotari changed the title PIP-379: Key_Shared Draining Hashes for Improved Message Ordering [improve][pip] PIP-379: Key_Shared Draining Hashes for Improved Message Ordering Sep 14, 2024
@github-actions github-actions bot added the doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. label Sep 14, 2024
Copy link
Contributor

@equanz equanz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome.

I'd like to ask you some questions as I still don't understand some parts.


The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions:

1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the affected hashes when joining and leaving?
I can roughly guess, but I would like to check just to be sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are the hashes where the assignment changes from an existing consumer to another customer, which could be a new or existing consumer.
Previously there has been an assumption that hash assignments would only happen from existing consumers to new consumers. However #23315 shows that this assumption doesn't hold.
In PIP-379 design, there's no assumption that hash assignments couldn't change for existing consumers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the challenges in getting a snapshot of the current hash assignments: "ConsistentHashingStickyKeyConsumerSelector's getConsumerKeyHashRanges doesn't return correct results" #23321

3. A reference counter tracks pending messages for each hash in the "draining hashes" set.
4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.
5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery.
6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer. The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the original consumer is disconnected, what consumer can receive these pending messages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a consumer disconnects, the messages return to the redelivery queue. There are currently some race conditions related to the timing of when these messages are delivered. This was discussed in #23307 (comment) .

In the current version of PIP-379, the "Updated contract of preserving ordering" is simply "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time.". In the implementation of PIP-379, this is an invariant which must hold at all times. Since the invariant is simple, there's much better chances to succeed in holding this invariant at all times in the system. I believe that taking this approach will help detect and resolve possible race conditions that would break this invariant.


The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions:

1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell me the draining hash data structure? Of course, a rough design is acceptable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be simply Map<Integer, DrainingHashEntry>.
where key is Integer hash (16 bytes object overhead + 4 bytes for int) and the value is

// 16 bytes overhead
class DrainingHashEntry {
    // AtomicIntegerFieldUpdater could be used to update refCount without an additional wrapper object
    // 4 bytes
    volatile int refCount;
    // 8 bytes
    final long consumerId;
    ...
}

A rough byte size estimate of memory usage per entry is 80 bytes (key: 16+16+4 value:16+16+4+8). I shared a memory usage estimate on the mailing list:

Memory usage estimates:

  • Worst case (all 64k hashes draining for a subscription): about 5MB
  • Practical case (less than 1000 hashes draining): less than 80 kilobytes
  • For 10,000 draining hashes: about 800 kB

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be defined in the proposal, not comment. Otherwise, the reviewers might not understand how it works in details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Thanks. I'll make sure to include this in the document.

1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.
2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed.
3. A reference counter tracks pending messages for each hash in the "draining hashes" set.
4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method be used when there are some messages with the same key hash but different keys?
(I think it does because the draining hash data structure holds the number of messages.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The different keys wouldn't matter. This solution is for tracking at hash level. The invariant of "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time." would still hold.
It can be considered an implementation detail that the tracking is at hash level. This would be explained to users since the observability (stats/metrics) is at the hash level.

The broker doesn't unpack batch messages. It's the responsibility of the producer to produce batch messages where all messages in the batch contain the same key and therefore have the same hash.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main consequence of hash collisions is that a single key is coupled to any other key with the same hash and would get blocked together.


Topic stats for the removed PIP-282 "recently joined consumers"/"last sent position" solution are removed:
- `lastSentPositionWhenJoining` field for each consumer
- `consumersAfterMarkDeletePosition` field for each Key_Shared subscription
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a break change for the existing users for how they troubleshoot Key_Shared subscription issues today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a replacement. It's not a breaking change in that sense. That's also the reason why there's a benefit that this goes into 4.0 since in major versions breaking changes are expected. I'm sure that nobody will miss using these fields! If they do, they should remain on Pulsar 3.x.

There won't even be a need to troubleshoot consumer stuck issues in PIP-379 design in the same way. There's a replacement for cases where a consumer really doesn't acknowledge a message. There's also an optional REST API for finding out the actual message IDs of the unacked messages that could be used to assist the troubleshooting.

Copy link
Contributor

@codelipenghui codelipenghui Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a break change, we changed the REST API without any smooth transition. The field consumersAfterMarkDeletePosition will be delete, some users might build metrics or something on this field. We need to keep the default behavior consistent for all the APIs exposed to users. We can add more fields, but should not remove the existing one or change the meaning of the existing one.

3.0 to 4.0 should not have break change, we promised user that they can upgrade from 3.0 to 4.0 directly without any compatibility issue. https://pulsar.apache.org/contribute/release-policy/#compatibility-between-releases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a break change, we changed the REST API without any smooth transition. The field consumersAfterMarkDeletePosition will be delete, some users might build metrics or something on this field. We need to keep the default behavior consistent for all the APIs exposed to users. We can add more fields, but should not remove the existing one or change the meaning of the existing one.

3.0 to 4.0 should not have break change, we promised user that they can upgrade from 3.0 to 4.0 directly without any compatibility issue. https://pulsar.apache.org/contribute/release-policy/#compatibility-between-releases.

It's change, but not a breaking change. This is not a compatibility in the sense that our release policy expresses. That's about bookkeeper storage formats and such major details.

Copy link
Contributor

@codelipenghui codelipenghui Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I can't agree. If we follow this way, we can delete or modify any output for the REST API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui PIP-282 already changed the fields and the meaning of the fields. Have you considered that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the proposal PIP-282 should provide an option for user to move smoothly. Instead of changing meaning or rename the output of public API directly.


The updated contract explicitly states that it is not possible to retain key-based ordering of messages when negative acknowledgements are used. Changing this is out of scope for PIP-379. A potential future solution for handling this would be to modify the client so that when a message is negatively acknowledged, it would also reject all further messages with the same key until the original message gets redelivered. It's already possible to attempt to implement this in client-side code. However, a proper solution would require support on the broker side to block further delivery of the specific key when there are pending negatively acknowledged messages until all negatively acknowledged messages for that particular key have been acknowledged by the consumer. This solution is out of scope for PIP-379. A future implementation to address these problems could build upon PIP-379 concepts such as "draining hashes" and extend that to cover the negative acknowledgement scenarios.

### High-Level implementation plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to understand how the new mechanism works. But I don't really understand how can it "Unnecessary Message Blocking". @lhotari Could you please give a simple example like what happened when the first consumer joined, and then the second consumer, third consumer? So that we can understand the approach in a clear manner.

And I think @equanz also has most of the questions about this part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui

I'll improve the document later to include a better explanation.

I'll try to describe it also in this comment. So if we think about it, there's "necessary message blocking" that is absolutely necessary. The invariant is "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time." .
All other message blocking outside of this is unnecessary.

Since we only block messages to ensure that the invariant holds, it's reducing all unnecessary blocking.

In our invariant, we mention that we need to ensure that a same key cannot be in unacked state to multiple consumers. In the implementation, the implementation detail is to do the tracking at hash level instead of key level. The invariant will still hold. If all "unnecessary blocking" would be eliminated, the tracking would be at key level. However we want to make a tradeoff between memory usage and minimal blocking and that's why using a hash makes sense in the implementation.

Does this explanation of "unnecessary" and "necessary" blocking make sense?

I could add the simple example to the documentation to explain how the solution would work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari I understand what is the "unnecessary blocking" and what is the "necessary message blocking". My question is the document only show the very high level of Drained Hash, if there is an example, step by step for consumer join, leaving, then we can more easier to understand the approach.

Now I guess the solution is for each delivered message, we will record a hash for it hash -> (consumer Id + counter) and increase the counter if the hash already exist, and decrease the counter if the message get acked?


The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions:

1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be defined in the proposal, not comment. Otherwise, the reviewers might not understand how it works in details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. PIP ready-to-test type/PIP
Development

Successfully merging this pull request may close these issues.

3 participants