Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behaviors of ConnectableFlux #3836

Closed
dfa1 opened this issue Jul 1, 2024 · 11 comments
Closed

Unexpected behaviors of ConnectableFlux #3836

dfa1 opened this issue Jul 1, 2024 · 11 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter

Comments

@dfa1
Copy link
Contributor

dfa1 commented Jul 1, 2024

ConnectableFlux has some unexpected behavior when there are many subscribers (>1000).

We have an upstream system that is providing a callback method that we use as following:

       final Queue<Message> queue = Queues.<Message>get(maxBackPressure).get();
       final Sinks.Many<Message> sink = many().unicast().onBackpressureBuffer(queue);
       final Scheduler single = Schedulers.newSingle("Mapping-" + index);
       final Flux<Message> recv = sink.asFlux()
                .publishOn(single)
                .publish()
                .autoConnect();

sink then is used as:

           @Override
            public void handleMsg(Message message) {
                final Sinks.EmitResult result = sink.tryEmitNext(message);
                // handle errors
            }

and finally, the recv flux then is shared between hundreds of subscribers.
The subscribers have some mapping logic and then the output is a WebSocket message.

Expected Behavior

Two things:

  • the buffer of the sink is growing beyond the limit used in the queue above: often I can observe values going beyond 10x the limit by using sink.scan(Scannable.Attr.BUFFERED) in interval flux;
  • on my machine, when the number of subscribers goes beyond 2000 a lot of messages are dropped: here I don't really understand what is the bottleneck in the profiler. Is there any known limit in the number of supported subscribers?

Actual Behavior

Right now I'm observing the value of the sink buffer going beyond 200K but the limit is 20K.

Steps to Reproduce

@Test
void reproCase() {
  // TODO
}

Possible Solution

Your Environment

  • Reactor version(s) used:
    reactor-core 3.6.4
    reactor-netty-http 1.1.17

  • Other relevant libraries versions (eg. netty, ...): netty 4.1.107

  • JVM version (java -version): JDK 21 (Zulu)

  • OS and version (eg uname -a): Windows 11 PRO but same can be observed in Linux (UBI 8)

@chemicL
Copy link
Member

chemicL commented Jul 2, 2024

Hey, @dfa1 !

Thanks for the report. In order to properly address your concerns, let me try to list them and clarify:

  1. "the buffer of the sink is growing beyond the limit used in the queue above" -> you mean the queue.size() reports as being larger?
  2. "a lot of messages are dropped" -> can you please be specific what "dropped" means in this case?

For both of these claims, can you specify exactly how you measure the above?

Now, on to the actual case -> You use Queues.get() to obtain the Queue. Is the producer (handleMsg method) always serialized? The Queues class' Javadoc states:

 * Queue utilities and suppliers for 1-producer/1-consumer ready queues adapted for
 * various given capacities.

The other aspect is combining Sinks.many().unicast(), which is meant for a single Subscriber with .publishOn(single).publish().autoConnect(), which has known issues with late arriving subscribers.

Can you please provide a reproducer as a test case where it would be clear what you are validating against your expectations and also the concurrency from each side of the equation (Sink's producer and the autoConnect consumer)?

Thanks.

@chemicL chemicL added the status/need-user-input This needs user input to proceed label Jul 2, 2024
@dfa1
Copy link
Contributor Author

dfa1 commented Jul 2, 2024

@chemicL thanks :)

"the buffer of the sink is growing beyond the limit used in the queue above" -> you mean the queue.size() reports as being larger?

yes, I have a metric based on sink.scan(Scannable.Attr.BUFFERED) that reports much more than the configured size of the queue.

can you please be specific what "dropped" means in this case?

Means that sink.tryEmitNext yields error, usually FAIL_OVERFLOW.

Now, on to the actual case -> You use Queues.get() to obtain the Queue. Is the producer (handleMsg method) always serialized? The Queues class' Javadoc states:

yes, the callback happens always in the same thread. The thread is used also to read from a socket with a custom protocol: so the idea is to "free" this thread by any other work that would slow down the I/O.

The other aspect is combining Sinks.many().unicast(), which is meant for a single Subscriber with .publishOn(single).publish().autoConnect(), which has known issues with late arriving subscribers.

Can you please elaborate that a bit? Initially I used many().multicast() as sink and then with flux + publish().autoConnect() but according to my tests, it is a bit faster with unicast().

Can you please provide a reproducer as a test case where it would be clear what you are validating against your expectations and also the concurrency from each side of the equation (Sink's producer and the autoConnect consumer)?

I will try as I also need to better understand the issue.

@dfa1
Copy link
Contributor Author

dfa1 commented Jul 16, 2024

@chemicL I'm still trying to understand the issue. But I cannot really understand the how autoPublish() is using the backpressure buffer. I see that sometimes the buffers increases without any subscriptions... I'm checking it with

sink.scan(Scannable.Attr.BUFFERED)

@simonbasle can you help here please?

@chemicL
Copy link
Member

chemicL commented Jul 16, 2024

@dfa1 I can try to help address your concerns once you provide a reproducer where you show exactly what you measure and what doesn't meet your expectations, otherwise I am not sure what we are trying to achieve. A test case, a zip, or a repository will be appreciated. Otherwise I will need to mark this as "for stackoverflow" and close as it doesn't meet the criteria described in our contributing guidelines at this point.

@dfa1
Copy link
Contributor Author

dfa1 commented Jul 16, 2024

@chemicL here is the minimal reproducer: I tried to reproduce our scenario without using too many internal classes.
Basically there is a fast producer and many slow cpu-bound consumers, sending JSON over websocket).

On my machine (i7-10850H, 12 cores, 32GB of ram) with JDK 21, I can go up to 4000 subscriptions before getting dropped messages. My question is: is there any known limitations in publish() + autoConnect()?

package org.example;

import org.junit.jupiter.api.Test;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;

import java.time.Duration;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

import static reactor.core.publisher.Sinks.many;

class MainTest {

    @Test
    void repro() throws InterruptedException {
        // config
        final int numberOfSubscriptions = 10;
        final int bufferSize = 1000;
        final boolean immediateScheduler = false;
        final boolean extraLogging = false;
        final Duration monitoringInterval = Duration.ofSeconds(1);

        // sink
        final Queue<String> queue = Queues.<String>get(bufferSize).get();
        final Sinks.Many<String> sink = many().unicast().onBackpressureBuffer(queue);

        // monitoring
        final AtomicLong consumed = new AtomicLong();
        final AtomicLong dropped = new AtomicLong();

        Flux.interval(monitoringInterval)
                .doOnNext(i -> {
                            int size = sink.scanOrDefault(Scannable.Attr.BUFFERED, 0);
                            System.out.println("buffer size is " + size);
                            System.out.println("consumed " + consumed.get());
                            System.out.println("dropped " + dropped.get());

                        }
                ).subscribeOn(Schedulers.parallel())
                .subscribe();

        // fast producer (blocking service)
        final Thread producer = new Thread(() -> {
            long i = 0;
            while (true) {
                Sinks.EmitResult emitResult = sink.tryEmitNext(String.valueOf(i++));
                if (emitResult != Sinks.EmitResult.OK) {
                    dropped.incrementAndGet();
                }
            }
        });

        final Flux<String> recv = sink.asFlux()
                .publish()
                .autoConnect();

        // simulating multiple slow consumers
        for (int i = 0; i < numberOfSubscriptions; i++) {
            final String subscription = String.valueOf(i);
            recv
                    .map(e -> {
                        if (extraLogging) {
                            System.out.printf("%s on %s%n", e, subscription);
                        }
                        byte[] bytes = new byte[16];
                        ThreadLocalRandom.current().nextBytes(bytes);
                        return UUID.nameUUIDFromBytes(bytes).toString();
                    })
                    .doOnNext(e -> consumed.incrementAndGet())
                    .subscribeOn(immediateScheduler ? Schedulers.immediate() : Schedulers.parallel())
                    .subscribe()
            ;
        }
        producer.start();
        producer.join();// will keep the test thread alive
    }

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>reactor-repro</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.6.8</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>5.10.2</version>
            <scope>test</scope>
        </dependency>

    </dependencies>
</project>

@chemicL chemicL added status/need-investigation This needs more in-depth investigation and removed status/need-user-input This needs user input to proceed labels Jul 24, 2024
@chemicL
Copy link
Member

chemicL commented Jul 25, 2024

I ran your reproducer on my M1 mac. With JDK21 here are my results:

  • with 1s interval in monitoring I'm able to run 9000 subscriptions with mostly no visible buffering
    • when I swap to 11000 it starts fluctuating from run to run and either drops significantly keeping the buffer full (reports 1024) or keeps delivering items flawlessly with buffer being at size 0
  • with 100ms interval in monitoring the 9000 case becomes less stable and more frequently ends up dropping

The implementation of SpscArrayQueue#size can cause some unexpected results when the CPU caches need to frequently exchange info about the volatile variables used for the consumer and producer which can slow everything down - the fact that you're observing the stream can have a negative impact.

One problem you mentioned was that the buffer size grows beyond what you expect. With 1000 as the argument, the implementation finds the nearest next power of 2, which turns out to be 1024 in this case. From the experiments it doesn't show that the buffer grows beyond that value.

When I disable the buffer scanning, for 11000 it still behaves unpredictably. I suppose that this is also expected. Once the buffer capacity is reached it is difficult to transfer draining ownership. In one case it might be the consumer that starts draining and can't catch up with the busy producer as it has more work to do. In the other case it's the producer that enters the drain loop and is able to stall its own producing so you don't see the effect.

What comes to mind is a backoff policy on the producer side to give the consumer some room to catch up. What you effectively see is the disconnect that reactive programming tries to address - there is no consumer-producer communication of demand. Were you able to avoid the callback handler and instead provide an end-to-end Publisher-oriented API, your app would be able to communicate backpressure properly. If that's not possible, some backoff strategy needs to happen at a higher level. Hope this helps.

Please let me know if there's anything else here that I missed. Otherwise, I'll close the issue as I don't see an apparent bug demonstrated by the example.

@chemicL chemicL added status/need-user-input This needs user input to proceed and removed status/need-investigation This needs more in-depth investigation labels Jul 25, 2024
@chemicL
Copy link
Member

chemicL commented Aug 1, 2024

Closing. Feel free to reopen in case there's more to add. Thanks.

@chemicL chemicL closed this as not planned Won't fix, can't repro, duplicate, stale Aug 1, 2024
@chemicL chemicL added for/stackoverflow Questions are best asked on SO or Gitter and removed status/need-user-input This needs user input to proceed labels Aug 1, 2024
@dfa1
Copy link
Contributor Author

dfa1 commented Aug 5, 2024

@chemicL thanks for your time, I failed to reply on time because of holidays! :-)

I have 2 points that maybe you can consider:

  • would it be possible to specify the ceilingNextPowerOfTwo behavior in the javadoc for Queues.get? It was really surprising to see another number as buffer size! Anyway, I'm ok on this point... thanks for the answer;

  • I see your point about backpressure but I cannot control the producer and I have only a callback interface to implement.

So, I did another reproducer (see below): this time it creates N subscriptions and a producer that creates a random number that is delivered to only 1 subscription. In this case, the consumer is also very fast (just a integer comparison).

=> on my machine, I start to see dropped messages around 5K subscriptions with publish() + autoConnect(). I also added a multicast sink to compare and it has similar limitations.

Here is the code, again tested with JDK 21 and reactor-core 3.6.8:

import org.junit.jupiter.api.Test;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;

import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

import static reactor.core.publisher.Sinks.many;

class MainTest {

    @Test
    void repro() throws InterruptedException {
        // config
        final int numberOfSubscriptions = 5_000;
        final int bufferSize = 1000;
        final boolean immediateScheduler = false;
        final boolean extraLogging = false;
        final boolean useMulticast = true;
        final Duration monitoringInterval = Duration.ofSeconds(1);

        // sink
        final Sinks.Many<Integer> sink;
        if (useMulticast) {
            sink = many().multicast().onBackpressureBuffer(bufferSize);
        } else {
            final Queue<Integer> queue = Queues.<Integer>get(bufferSize).get();
            sink = many().unicast().onBackpressureBuffer(queue);
        }

        // monitoring
        final AtomicLong consumed = new AtomicLong();
        final AtomicLong dropped = new AtomicLong();

        Flux.interval(monitoringInterval)
                .doOnNext(i -> {
                            int size = sink.scanOrDefault(Scannable.Attr.BUFFERED, 0);
                            System.out.println("tick " + i);
                            System.out.println("  buffer size is " + size);
                            System.out.println("  consumed " + consumed.get());
                            System.out.println("  dropped " + dropped.get());
                            System.out.println();
                        }
                ).subscribeOn(Schedulers.parallel())
                .subscribe();

        // fast producer (blocking service)
        final Thread producer = new Thread(() -> {
            while (true) {
                ThreadLocalRandom current = ThreadLocalRandom.current();
                Sinks.EmitResult emitResult = sink.tryEmitNext(current.nextInt(numberOfSubscriptions));
                if (emitResult != Sinks.EmitResult.OK) {
                    dropped.incrementAndGet();
                }
            }
        }, "Producer");

        final Flux<Integer> recv;

        if (useMulticast) {
            recv = sink.asFlux();
        } else {
            recv = sink.asFlux()
                    .publish()
                    .autoConnect();
        }

        // simulating multiple slow consumers
        for (int i = 0; i < numberOfSubscriptions; i++) {
            final int wantedItem = i;
            recv
                    .mapNotNull(item -> {
                        if (item.equals(wantedItem)) {
                            return item;
                        } else {
                            return null;
                        }
                    })
                    .map(item -> {
                        if (extraLogging) {
                            System.out.printf("got %s%n", item);
                        }
                        return String.valueOf(item);
                    })
                    .doOnNext(e -> consumed.incrementAndGet())
                    .subscribeOn(immediateScheduler ? Schedulers.immediate() : Schedulers.parallel())
                    .subscribe()
            ;
        }
        producer.start();
        producer.join();// will keep the test thread alive
    }

}

@chemicL
Copy link
Member

chemicL commented Aug 6, 2024

There is not that much of a difference with the added bits. The new scenario in reality does not work with dispatching a single particular item to 1 out of N subscribers. It delivers every item to all subscribers. What each subscriber does with an item is - it makes a decision whether the item should be pushed downstream.

Every subscriber receives every generated number, but it just delivers the one dedicated for it downstream.

So the behaviour is still the same. Swapping unicast with multicast gets rid of the potential buffering nature, as explained in the reference documentation.

@dfa1
Copy link
Contributor Author

dfa1 commented Aug 6, 2024

ok, so:

  • there is no better way to dispatch items only 1 subscriber? e.g. an existing operator or possibility to write a custom operator;
  • you don't see anything that can be written in the documentation/javadoc? i.e. the buffer size of Queues.get

@chemicL
Copy link
Member

chemicL commented Aug 8, 2024

I am not familiar with exactly how your app is built. Assuming what you already said is the case - one callback-based producer + many consumers in the form of WebSockets that you push data to clients of your service, have you considered a Sink per WebSocket as an inversion of your proposed solution?

Regarding the documentation - that makes sense - if you'd be open to submit a PR improving the javadoc, feel free -> otherwise, please open a dedicated issue. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

2 participants