-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BufferTimeoutSubscriber is not thread safe #3738
Comments
Thank you for the report @jamiestewart 👍 There is another implementation of the Regarding this issue, we'd need:
Are you considering a contribution, @jamiestewart? |
is it possible for you to upgrade to newer version @jamiestewart , does this problem still present? |
@jamiestewart can you please try the latest version of reactor or 3.5.18 at minimum and use |
Closing as this issue is known and the fairBackpressure variant is Thread-safe and should be used instead. |
This issue seems to be very similar in cause and solution to reactor/reactor-core#2362. I recommend that you read that issue before continuing further here.
Like reactor/reactor-core#2362, synchronization is present around many of BufferTimeoutSubscriber's operations on its buffer, but
BufferTimeoutSubscriber.cancel()
callsOperators.onDiscardMultiple()
(which iterates through the buffer) with no synchronization, so another thread is not prevented from modifying the buffer while the cancel is underway. This can cause multiple failures.The most problematic of these failures is
ArrayIndexOutOfBounds
, thrown fromThis can occur if the buffer is being modified by one thread calling
onNext()
while another thread callscancel()
, so the cancelling thread is callingList.clear()
while another thread is adding to the list.Somewhat more common is a
ConcurrentModificationException
, thrown when the cancelling thread is iterating through the buffer while another thread modifies it. In anticipiation of this problem.Operators.onDiscardMultiple()
catches this exception and logs it at WARN level, which prevents it from propagating, but it can be worrisome for application administrators, and the application prevents some discarded elements from being passed to the hook. Below is an example of the logggedConcurrentModificationException
:It looks like
reactor.core.publisher.FluxBufferTimeout.BufferTimeoutSubscriber.cancel()
violates the reactive-streams specification:BufferTimeoutSubscriber.cancel
is clearly not thread safe with respect toonNext
, if one thread runscancel
iterating over the buffer to dispose its elements (via Operators.onDiscardMultiple) while another thread callsonNext
to add new elements to the buffer. This causesConcurrentModificationException
to be thrown (at least when the underlying buffer is a ArrayList, which it is by default.)Expected Behavior
BufferTimeoutSubscriber.cancel
MUST return normally, even if another thread is callingBufferTimeoutSubscriber.onNext
.Actual Behavior
On occasion,
BufferTimeoutSubscriber.cancel
is observed to throwConcurrentModificationException
if invoked while another thread is callingBufferTimeoutSubscriber.onNext
.Steps to Reproduce
Below is an imperfect unit test for exercising this behavior.
Possible Solution
As was done in the fix for 2362, synchronizing the implementation of
BufferTimeoutSubscriber.cancel
onthis
would address the problem.My Environment
Reactor version(s) used: 3.4.21
JVM version (java -version):
openjdk version "17.0.2"
openjdk 17.0.2 2022-01-18
OpenJDK Runtime Environment (build 17.0.2+8-86)
OpenJDK 64-Bit Server VM (build 17.0.2+8-86, mixed mode, sharing)
OS and version (eg uname -a):
Darwin jstewart-mb01 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64
The text was updated successfully, but these errors were encountered: