Skip to content

Commit

Permalink
Merge reactor#3039 into 3.5.0-M3
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed May 17, 2022
2 parents 95d4340 + f9a1e3f commit df31c60
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 11 deletions.
98 changes: 88 additions & 10 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.core.scheduler;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -36,6 +37,7 @@
import java.util.function.Supplier;

import io.micrometer.core.instrument.MeterRegistry;

import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
Expand Down Expand Up @@ -103,9 +105,6 @@ public abstract class Schedulers {
.map(Integer::parseInt)
.orElse(100000);

@Nullable
static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;

/**
* Create a {@link Scheduler} which uses a backing {@link Executor} to schedule
* Runnables for async operators.
Expand Down Expand Up @@ -508,18 +507,68 @@ public static Scheduler newSingle(ThreadFactory threadFactory) {
}

/**
* Define a hook that is executed when a {@link Scheduler} has
* Define a hook anonymous part that is executed alongside keyed parts when a {@link Scheduler} has
* {@link #handleError(Throwable) handled an error}. Note that it is executed after
* the error has been passed to the thread uncaughtErrorHandler, which is not the
* case when a fatal error occurs (see {@link Exceptions#throwIfJvmFatal(Throwable)}).
* <p>
* This variant uses an internal private key, which allows the method to be additive with
* {@link #onHandleError(String, BiConsumer)}. Prefer adding and removing handler parts
* for keys that you own via {@link #onHandleError(String, BiConsumer)} nonetheless.
*
* @param c the new hook to set.
* @param subHook the new {@link BiConsumer} to set as the hook's anonymous part.
* @see #onHandleError(String, BiConsumer)
*/
public static void onHandleError(BiConsumer<Thread, ? super Throwable> c) {
public static void onHandleError(BiConsumer<Thread, ? super Throwable> subHook) {
Objects.requireNonNull(subHook, "onHandleError");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Hooking new default: onHandleError");
LOGGER.debug("Hooking onHandleError anonymous part");
}
synchronized (LOGGER) {
onHandleErrorHooks.put(Schedulers.class.getName() + ".ON_HANDLE_ERROR_ANONYMOUS_PART", (BiConsumer<Thread, Throwable>) subHook);
onHandleErrorHook = createOrAppendHandleError(onHandleErrorHooks.values());
}
onHandleErrorHook = Objects.requireNonNull(c, "onHandleError");
}

/**
* Define a keyed hook part that is executed alongside other parts when a {@link Scheduler} has
* {@link #handleError(Throwable) handled an error}. Note that it is executed after
* the error has been passed to the thread uncaughtErrorHandler, which is not the
* case when a fatal error occurs (see {@link Exceptions#throwIfJvmFatal(Throwable)}).
* <p>
* Calling this method twice with the same key replaces the old hook part
* of the same key. Calling this method twice with two different keys is otherwise additive.
* Note that {@link #onHandleError(BiConsumer)} also defines an anonymous part which
* effectively uses a private internal key, making it also additive with this method.
*
* @param key the {@link String} key identifying the hook part to set/replace.
* @param subHook the new hook part to set for the given key.
*/
@SuppressWarnings("unchecked")
public static void onHandleError(String key, BiConsumer<Thread, ? super Throwable> subHook) {
Objects.requireNonNull(key, "key");
Objects.requireNonNull(subHook, "onHandleError");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Hooking onHandleError part with key {}", key);
}
synchronized (LOGGER) {
onHandleErrorHooks.put(key, (BiConsumer<Thread, Throwable>) subHook);
onHandleErrorHook = createOrAppendHandleError(onHandleErrorHooks.values());
}
}

@Nullable
private static BiConsumer<Thread, ? super Throwable> createOrAppendHandleError(Collection<BiConsumer<Thread, Throwable>> subHooks) {
BiConsumer<Thread, Throwable> composite = null;
for (BiConsumer<Thread, Throwable> value : subHooks) {
if (composite != null) {
composite = composite.andThen(value);
}
else {
composite = value;
}
}
return composite;
}

/**
Expand Down Expand Up @@ -636,13 +685,37 @@ public static void resetFrom(@Nullable Snapshot snapshot) {
}

/**
* Reset the {@link #onHandleError(BiConsumer)} hook to the default no-op behavior.
* Reset the {@link #onHandleError(BiConsumer)} hook to the default no-op behavior, erasing
* all sub-hooks that might have individually added via {@link #onHandleError(String, BiConsumer)}
* or the whole hook set via {@link #onHandleError(BiConsumer)}.
*
* @see #resetOnHandleError(String)
*/
public static void resetOnHandleError() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reset to factory defaults: onHandleError");
}
onHandleErrorHook = null;
synchronized (LOGGER) {
onHandleErrorHooks.clear();
onHandleErrorHook = null;
}
}

/**
* Reset a specific onHandleError hook part keyed to the provided {@link String},
* removing that sub-hook if it has previously been defined via {@link #onHandleError(String, BiConsumer)}.
*/
public static void resetOnHandleError(String key) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remove onHandleError sub-hook {}", key);
}
synchronized (LOGGER) {
//avoid resetting monolithic hook if no keyed hook has been set
//also avoid resetting anything if the key is unknown
if (onHandleErrorHooks.remove(key) != null) {
onHandleErrorHook = createOrAppendHandleError(onHandleErrorHooks.values());
}
}
}

/**
Expand Down Expand Up @@ -1016,6 +1089,11 @@ public void dispose() {

static volatile Factory factory = DEFAULT;

private static final LinkedHashMap<String, BiConsumer<Thread, Throwable>> onHandleErrorHooks = new LinkedHashMap<>(1);

@Nullable
static BiConsumer<Thread, ? super Throwable> onHandleErrorHook;

private static final LinkedHashMap<String, Function<Runnable, Runnable>> onScheduleHooks = new LinkedHashMap<>(1);

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import reactor.test.AutoDisposingExtension;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -40,6 +41,7 @@ public class SchedulersHooksTest {
@AfterEach
public void resetAllHooks() {
Schedulers.resetOnScheduleHooks();
Schedulers.resetOnHandleError();
}

@Test
Expand Down Expand Up @@ -145,6 +147,102 @@ public void onSchedulesAreOrdered() throws Exception {
);
}

@Test
void onHandleErrorWithKeyIsAdditive() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(1));
Schedulers.onHandleError("k2", (t, error) -> tracker.addAndGet(10));
Schedulers.onHandleError("k3", (t, error) -> tracker.addAndGet(100));

Schedulers.handleError(new IllegalStateException("expected"));

assertThat(tracker).as("3 handlers invoked").hasValue(111);
}

@Test
void onHandleErrorWithKeyReplaces() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(1));
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(10));
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(100));

Schedulers.handleError(new IllegalStateException("expected"));

assertThat(tracker).as("last k1 invoked").hasValue(100);
}

@Test
void onHandleErrorWithNoKeyIsAdditiveWithOtherKeys() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(1));
Schedulers.onHandleError("k2", (t, error) -> tracker.addAndGet(10));
Schedulers.onHandleError("k3", (t, error) -> tracker.addAndGet(100));

Schedulers.handleError(new IllegalStateException("expected1"));

assertThat(tracker).as("3 handlers invoked prior to setting anonymous hook part").hasValue(111);
tracker.set(0);

Schedulers.onHandleError((t, error) -> tracker.addAndGet(1000));
Schedulers.handleError(new IllegalStateException("expected2"));

assertThat(tracker).as("4 handlers invoked after anonymous hook part").hasValue(1111);
}

@Test
void onHandleErrorWithKeyIgnoresUnknownRemovals() {
assertThatCode(() -> Schedulers.resetOnHandleError("k1"))
.doesNotThrowAnyException();
}

@Test
void onHandleErrorResetOneKey() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(1));
Schedulers.onHandleError("k2", (t, error) -> tracker.addAndGet(10));
Schedulers.onHandleError("k3", (t, error) -> tracker.addAndGet(100));
Schedulers.resetOnHandleError("k2");

Schedulers.handleError(new IllegalStateException("expected"));

assertThat(tracker).hasValue(101);
}

@Test
void onHandleErrorResetAll() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError("k1", (t, error) -> tracker.addAndGet(1));
Schedulers.onHandleError("k2", (t, error) -> tracker.addAndGet(10));
Schedulers.onHandleError("k3", (t, error) -> tracker.addAndGet(100));
Schedulers.resetOnHandleError();

Schedulers.handleError(new IllegalStateException("expected"));

assertThat(tracker).hasValue(0);
}

@Test
void onHandleErrorResetAnonymousHook() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError((t, error) -> tracker.addAndGet(100));
Schedulers.resetOnHandleError();

Schedulers.handleError(new IllegalStateException("expected"));

assertThat(tracker).hasValue(0);
}

@Test
void onHandleErrorWithUnknownSubKeyDoesntResetAnonymousHook() {
AtomicInteger tracker = new AtomicInteger();
Schedulers.onHandleError((t, error) -> tracker.addAndGet(100));
Schedulers.resetOnHandleError("k1");

Schedulers.handleError(new IllegalStateException("expected"));

assertThat(tracker).hasValue(100);
}

private static class TrackingDecorator implements Function<Runnable, Runnable> {
final AtomicInteger tracker;
final int dx;
Expand Down

0 comments on commit df31c60

Please sign in to comment.