Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Flux.timeout efficiency #2845

Open
bruto1 opened this issue Nov 21, 2021 · 17 comments
Open

Improve Flux.timeout efficiency #2845

bruto1 opened this issue Nov 21, 2021 · 17 comments
Labels
area/performance This belongs to the performance theme for/user-attention This issue needs user attention (feedback, rework, etc...) help wanted We need contributions on this
Milestone

Comments

@bruto1
Copy link
Contributor

bruto1 commented Nov 21, 2021

Current implementation uses parallel scheduler by default which is in essence a ScheduledThreadPoolExecutor which has the task queue guarded by ReentrantLock, which means two trips through said lock per signal on average

After looking at Netty's https://github.com/netty/netty/blob/4.1/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java there appears to be a way to do fewer timeout task reschedules if you let go of the idea illustrated by the current marble diagrams (new signal cancels the previous timeout task)

WDYT?
this really prevents Flux.timeout from being useful with high-frequency publishers which may occasionally stall

suggestion inspired by https://gitter.im/reactor/reactor?at=61968a44197fa95a1c7adf05

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Nov 21, 2021
@simonbasle simonbasle added area/performance This belongs to the performance theme status/need-design This needs more in depth design work type/enhancement A general enhancement status/need-decision This needs a decision from the team and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Nov 22, 2021
@simonbasle
Copy link
Member

In Netty, it looks like IdleStateHandler still uses some form of scheduling with a configurable EventExecutor... Not sure how this would translate in reactor-core and how different it would be.

I was going to suggest using a different Scheduler like Schedulers.newSingle() for that sort of timeouts, but it also relies on ScheduledThreadPoolExecutor... Which makes sense given that the only JDK implementation of ScheduledExecutorService is ScheduledThreadPoolExecutor 😓

One thing to keep in mind is that currently the underlying implementation of timeout(Duration) is actually timeout(Publisher) where the publisher is a Mono.delay. The Netty strategy might be an option but it would imply a separated implementation for timeout(Duration) (as opposed to more generic timeout(Publisher))...

So unless we get a stronger signal that this causes issues for a majority of users, I'm not keen on dedicating much bandwidth to changing timeout(Duration).

There's an element of team prioritization affecting that decision, so we could still accept a PR.
That being said, the additional maintenance burden due to the split would warrant some benchmark quantifying the boost such a solution represents before we can definitely accept it.

@simonbasle
Copy link
Member

simonbasle commented Nov 22, 2021

Interesting side note: RxJava seems to have a close approach where timeout tasks also always trigger, but the id of the "real" timeout task is captured so that an outdated timeout will be no-op (failing the CAS on their index). that is another potential inspiration, with the same "split-out-timeout(Duration)-case-and-maintain" drawback.

@bruto1
Copy link
Contributor Author

bruto1 commented Nov 23, 2021

a benchmark means actually implementing the new timeout and bombing both impls with signals, then comparing throughput?

@simonbasle
Copy link
Member

yes, pretty much. I just want to set expectation here in case somebody wants to contribute: it can still get rejected if the benchmark doesn't show enough improvement.

@simonbasle simonbasle added the help wanted We need contributions on this label Nov 23, 2021
@unoexperto
Copy link

@simonbasle Hi Simon. I'm the original guy from the Gitter thread. Could you please take a look at my first take of the problem to determine if it deserves PR ? Here is what I use in production now:

https://gist.github.com/unoexperto/fe6725a9bf20ff04e0cba0fbbf8a7606

I realize performance benefits are very specific to nature of the application but in my case I have ~3X throughput improvement in my product. In synthetic tests of the pipeline original timeout() spends ~38% of time in Mono.subscribe().

@simonbasle
Copy link
Member

thanks for providing that @unoexperto. I had a bit of trouble comparing that code to the FluxTimeout one because of Kotlin, but it seems very close.

I was surprised to see it still had generic Publisher<?> for the timeout trigger (so using a Mono.delay would still use the Schedulers.parallel() by default). It also only covers a subset of the timeout operator API (no generation of timeout triggers per onNext, for instance. So I don't think we can easily integrate that into the reactor codebase.

Overall, looking back at this and at the FluxTimeout code, I think my statement that we'd need to maintain a specific implementation for the time-based timeout was probably wrong. Indeed I get the feeling that the index variable that we monotonically increment in FluxTimeout is enough. We don't really need the timestamp aspect of System.nanoTime().

So the only modification we'd need after all would be that FluxTimeout.TimeoutMainSubscriber#setTimeout stops cancelling the old timeout mono.

The behavior could even be configurable, at least at the constructor level (which would facilitate benchmarking in order to get a clear picture of the throughput improvement vs the gc-pressure/pressure on the scheduler's task queue).

wdyt?

@simonbasle
Copy link
Member

@unoexperto @bruto1 looks like I missed the fact that timeout isn't rescheduled in onNext but in doTimeout (which currently is imprecise, but could be improved). So with that approach we DO indeed need nanoTime and we DO need a separate time-only implementation :(

By just eliminating cancellation of old timeout triggers, we retain the generic single implementation. Question is: does that help with performance?

@bruto1
Copy link
Contributor Author

bruto1 commented Dec 3, 2021

it should
time-only impl would be able to schedule a more precise delay() upon completion of the last one, though

@unoexperto
Copy link

@simonbasle @bruto1

By just eliminating cancellation of old timeout triggers, we retain the generic single implementation. Question is: does that help with performance?

Profiling shows that it's not the cancellation that eats CPU but subscription to Mono in this line. That's why at the expense of precision I moved rescheduling to doTimeout when previous timeout is finalized.

Do you think it's possible to implement generic timeout() without exposing concept of time to users? Perhaps .timeout() is misleading name in the first place. Semantically it's more .cancellation(). Thus coarseTimeout should have only one version that accepts Duration.

@simonbasle
Copy link
Member

@pderop is conducting some interesting experiments with HashedWheelTimers. we do see a measurable improvement so we might consider it. the only question is what to expose exactly, and I'm leaning towards making an internal implementation of wheel timer at first, to only be used by the timeout(Duration) operator (time-based, default Scheduler). we can always consider exposing it as a Schedulers type of thing later on.

@simonbasle simonbasle added this to the 3.4.x Backlog milestone Dec 14, 2021
@pderop
Copy link
Contributor

pderop commented Jan 3, 2022

@bruto1

Hi,

Out of curiosity, can you confirm if performance is better when using the timeout operator with a "single" scheduler (Schedulers.single()) instead of using default parallel scheduler ?

for example, instead of using defaut parallel scheduler like:

                timeout(Duration.ofMillis(100), Mono.just(-1))

then replace by:

                timeout(Duration.ofMillis(100), Mono.just(-1), Schedulers.single())

thanks.

@bruto1
Copy link
Contributor Author

bruto1 commented Jan 16, 2022

I don't actually have a mini benchmark for this, @pderop - noticed the effect while profiling the entire service at work
Why would single work better, though, if single() uses the same ScheduledThreadPoolExecutor?

	/**
	 * Instantiates the default {@link ScheduledExecutorService} for the SingleScheduler
	 * ({@code Executors.newScheduledThreadPoolExecutor} with core and max pool size of 1).
	 */
	@Override
	public ScheduledExecutorService get() {
		ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, this.factory);
		e.setRemoveOnCancelPolicy(true);
		e.setMaximumPoolSize(1);
		return e;
	}

but only one thread instead of several of parallel() to serve the same number of I/O threads - it should result in more contention for work queue's lock, if anything

@simonbasle simonbasle added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed type/enhancement A general enhancement status/need-decision This needs a decision from the team status/need-design This needs more in depth design work labels Jan 31, 2022
@simonbasle
Copy link
Member

@bruto1 @pderop observed in his benchmark there was more time spent in the implicit picking of a Worker in Schedulers.parallel(), which isn't the case with Schedulers.single(). The use of single() was thus raised as a potential easy way to improve the situation for the use case covered in this issue (fast-producing publisher with timeouts that are not actually triggered).

it will be interesting to see if contention counterbalances that. But if using single() helps, it means we can avoid introducing a dedicated implementation for time-based timeouts, which would be preferable to me.

@bruto1
Copy link
Contributor Author

bruto1 commented Jan 31, 2022

@pderop can you please share the benchmark code?
It's better than mine because I have none (I've gotten rid of most of the blank timeouts since I filed this issue) but the results quoted by @simonbasle are counterintuitive

@pderop
Copy link
Contributor

pderop commented Jan 31, 2022

Hi @bruto1 ,

sure, the sample project is here,
I hope it will help to track this issue.

@a701440
Copy link

a701440 commented May 15, 2023

Hello Guys,

Any update on this issue? We are using version 3.5.5.
We have run into this when using large number of items with Mono timeout.
Try the test code bellow. In local tests only 18,000 or so timeouts run in the 10 seconds delay at the bottom of the test.
Inspire of the fact that all Mono's have 50ms timeout and all are created and subscribed to before the 10 seconds start.

The output I get is:

start count=100000
success=0, fail=18600

@Test
public void testManyTimeouts() throws InterruptedException {
    long count = 100000;
    System.out.println("start count=" + count);
    AtomicLong successCnt = new AtomicLong();
    AtomicLong errorCount = new AtomicLong();
    for (int i = 0; i < count; i++) {
        int val = i;
        Mono<Object> m = Mono.create(sink -> {
            try {
                Thread.sleep(100);
                sink.success(val);
            } catch (InterruptedException e) {
                sink.error(e);
            }
        }).timeout(Duration.ofMillis(50));
        m.subscribeOn(Schedulers.boundedElastic()).subscribe(v -> {
            successCnt.incrementAndGet();
        }, e -> {
            errorCount.incrementAndGet();
        });
    }
    Thread.sleep(10000);
    System.out.println("success=" + successCnt.get() + ", fail=" + errorCount.get());
}

@bruto1
Copy link
Contributor Author

bruto1 commented Feb 10, 2024

hi @simonbasle
Been away for a while but got back around to this issue after all (better late than never, right?)

so single scheduler works well as long as there's only 1 thread scheduling and cancelling tasks on it
if there's more, contention for the same reentrantlock predictably goes up:
https://github.com/bruto1/test-reactor-flux-timer-benchmark

so maybe a new impl would be a good idea after all

@chemicL chemicL modified the milestones: 3.4.x Backlog, 3.6.x Backlog Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/performance This belongs to the performance theme for/user-attention This issue needs user attention (feedback, rework, etc...) help wanted We need contributions on this
Projects
None yet
Development

No branches or pull requests

7 participants