-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unexpected behaviors of ConnectableFlux #3836
Comments
Hey, @dfa1 ! Thanks for the report. In order to properly address your concerns, let me try to list them and clarify:
For both of these claims, can you specify exactly how you measure the above? Now, on to the actual case -> You use
The other aspect is combining Can you please provide a reproducer as a test case where it would be clear what you are validating against your expectations and also the concurrency from each side of the equation ( Thanks. |
@chemicL thanks :)
yes, I have a metric based on
Means that
yes, the callback happens always in the same thread. The thread is used also to read from a socket with a custom protocol: so the idea is to "free" this thread by any other work that would slow down the I/O.
Can you please elaborate that a bit? Initially I used
I will try as I also need to better understand the issue. |
@chemicL I'm still trying to understand the issue. But I cannot really understand the how autoPublish() is using the backpressure buffer. I see that sometimes the buffers increases without any subscriptions... I'm checking it with
@simonbasle can you help here please? |
@dfa1 I can try to help address your concerns once you provide a reproducer where you show exactly what you measure and what doesn't meet your expectations, otherwise I am not sure what we are trying to achieve. A test case, a zip, or a repository will be appreciated. Otherwise I will need to mark this as "for stackoverflow" and close as it doesn't meet the criteria described in our contributing guidelines at this point. |
@chemicL here is the minimal reproducer: I tried to reproduce our scenario without using too many internal classes. On my machine (i7-10850H, 12 cores, 32GB of ram) with JDK 21, I can go up to 4000 subscriptions before getting dropped messages. My question is: is there any known limitations in package org.example;
import org.junit.jupiter.api.Test;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import java.time.Duration;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import static reactor.core.publisher.Sinks.many;
class MainTest {
@Test
void repro() throws InterruptedException {
// config
final int numberOfSubscriptions = 10;
final int bufferSize = 1000;
final boolean immediateScheduler = false;
final boolean extraLogging = false;
final Duration monitoringInterval = Duration.ofSeconds(1);
// sink
final Queue<String> queue = Queues.<String>get(bufferSize).get();
final Sinks.Many<String> sink = many().unicast().onBackpressureBuffer(queue);
// monitoring
final AtomicLong consumed = new AtomicLong();
final AtomicLong dropped = new AtomicLong();
Flux.interval(monitoringInterval)
.doOnNext(i -> {
int size = sink.scanOrDefault(Scannable.Attr.BUFFERED, 0);
System.out.println("buffer size is " + size);
System.out.println("consumed " + consumed.get());
System.out.println("dropped " + dropped.get());
}
).subscribeOn(Schedulers.parallel())
.subscribe();
// fast producer (blocking service)
final Thread producer = new Thread(() -> {
long i = 0;
while (true) {
Sinks.EmitResult emitResult = sink.tryEmitNext(String.valueOf(i++));
if (emitResult != Sinks.EmitResult.OK) {
dropped.incrementAndGet();
}
}
});
final Flux<String> recv = sink.asFlux()
.publish()
.autoConnect();
// simulating multiple slow consumers
for (int i = 0; i < numberOfSubscriptions; i++) {
final String subscription = String.valueOf(i);
recv
.map(e -> {
if (extraLogging) {
System.out.printf("%s on %s%n", e, subscription);
}
byte[] bytes = new byte[16];
ThreadLocalRandom.current().nextBytes(bytes);
return UUID.nameUUIDFromBytes(bytes).toString();
})
.doOnNext(e -> consumed.incrementAndGet())
.subscribeOn(immediateScheduler ? Schedulers.immediate() : Schedulers.parallel())
.subscribe()
;
}
producer.start();
producer.join();// will keep the test thread alive
}
} pom.xml <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>reactor-repro</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.8</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project> |
I ran your reproducer on my M1 mac. With JDK21 here are my results:
The implementation of One problem you mentioned was that the buffer size grows beyond what you expect. With 1000 as the argument, the implementation finds the nearest next power of 2, which turns out to be 1024 in this case. From the experiments it doesn't show that the buffer grows beyond that value. When I disable the buffer scanning, for 11000 it still behaves unpredictably. I suppose that this is also expected. Once the buffer capacity is reached it is difficult to transfer draining ownership. In one case it might be the consumer that starts draining and can't catch up with the busy producer as it has more work to do. In the other case it's the producer that enters the drain loop and is able to stall its own producing so you don't see the effect. What comes to mind is a backoff policy on the producer side to give the consumer some room to catch up. What you effectively see is the disconnect that reactive programming tries to address - there is no consumer-producer communication of demand. Were you able to avoid the callback handler and instead provide an end-to-end Please let me know if there's anything else here that I missed. Otherwise, I'll close the issue as I don't see an apparent bug demonstrated by the example. |
Closing. Feel free to reopen in case there's more to add. Thanks. |
@chemicL thanks for your time, I failed to reply on time because of holidays! :-) I have 2 points that maybe you can consider:
So, I did another reproducer (see below): this time it creates N subscriptions and a producer that creates a random number that is delivered to only 1 subscription. In this case, the consumer is also very fast (just a integer comparison). => on my machine, I start to see dropped messages around 5K subscriptions with Here is the code, again tested with JDK 21 and reactor-core 3.6.8: import org.junit.jupiter.api.Test;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import static reactor.core.publisher.Sinks.many;
class MainTest {
@Test
void repro() throws InterruptedException {
// config
final int numberOfSubscriptions = 5_000;
final int bufferSize = 1000;
final boolean immediateScheduler = false;
final boolean extraLogging = false;
final boolean useMulticast = true;
final Duration monitoringInterval = Duration.ofSeconds(1);
// sink
final Sinks.Many<Integer> sink;
if (useMulticast) {
sink = many().multicast().onBackpressureBuffer(bufferSize);
} else {
final Queue<Integer> queue = Queues.<Integer>get(bufferSize).get();
sink = many().unicast().onBackpressureBuffer(queue);
}
// monitoring
final AtomicLong consumed = new AtomicLong();
final AtomicLong dropped = new AtomicLong();
Flux.interval(monitoringInterval)
.doOnNext(i -> {
int size = sink.scanOrDefault(Scannable.Attr.BUFFERED, 0);
System.out.println("tick " + i);
System.out.println(" buffer size is " + size);
System.out.println(" consumed " + consumed.get());
System.out.println(" dropped " + dropped.get());
System.out.println();
}
).subscribeOn(Schedulers.parallel())
.subscribe();
// fast producer (blocking service)
final Thread producer = new Thread(() -> {
while (true) {
ThreadLocalRandom current = ThreadLocalRandom.current();
Sinks.EmitResult emitResult = sink.tryEmitNext(current.nextInt(numberOfSubscriptions));
if (emitResult != Sinks.EmitResult.OK) {
dropped.incrementAndGet();
}
}
}, "Producer");
final Flux<Integer> recv;
if (useMulticast) {
recv = sink.asFlux();
} else {
recv = sink.asFlux()
.publish()
.autoConnect();
}
// simulating multiple slow consumers
for (int i = 0; i < numberOfSubscriptions; i++) {
final int wantedItem = i;
recv
.mapNotNull(item -> {
if (item.equals(wantedItem)) {
return item;
} else {
return null;
}
})
.map(item -> {
if (extraLogging) {
System.out.printf("got %s%n", item);
}
return String.valueOf(item);
})
.doOnNext(e -> consumed.incrementAndGet())
.subscribeOn(immediateScheduler ? Schedulers.immediate() : Schedulers.parallel())
.subscribe()
;
}
producer.start();
producer.join();// will keep the test thread alive
}
} |
There is not that much of a difference with the added bits. The new scenario in reality does not work with dispatching a single particular item to 1 out of N subscribers. It delivers every item to all subscribers. What each subscriber does with an item is - it makes a decision whether the item should be pushed downstream. Every subscriber receives every generated number, but it just delivers the one dedicated for it downstream. So the behaviour is still the same. Swapping |
ok, so:
|
I am not familiar with exactly how your app is built. Assuming what you already said is the case - one callback-based producer + many consumers in the form of WebSockets that you push data to clients of your service, have you considered a Sink per WebSocket as an inversion of your proposed solution? Regarding the documentation - that makes sense - if you'd be open to submit a PR improving the javadoc, feel free -> otherwise, please open a dedicated issue. Thanks! |
ConnectableFlux
has some unexpected behavior when there are many subscribers (>1000).We have an upstream system that is providing a callback method that we use as following:
sink
then is used as:and finally, the
recv
flux then is shared between hundreds of subscribers.The subscribers have some mapping logic and then the output is a WebSocket message.
Expected Behavior
Two things:
sink.scan(Scannable.Attr.BUFFERED)
in interval flux;Actual Behavior
Right now I'm observing the value of the sink buffer going beyond 200K but the limit is 20K.
Steps to Reproduce
Possible Solution
Your Environment
Reactor version(s) used:
reactor-core 3.6.4
reactor-netty-http 1.1.17
Other relevant libraries versions (eg.
netty
, ...): netty 4.1.107JVM version (
java -version
): JDK 21 (Zulu)OS and version (eg
uname -a
): Windows 11 PRO but same can be observed in Linux (UBI 8)The text was updated successfully, but these errors were encountered: