Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Processors from public API #3051

Merged
merged 12 commits into from
May 24, 2022
Merged
Prev Previous commit
Next Next commit
Rename ReplayProcessor to SinkManyReplayProcessor
  • Loading branch information
simonbasle committed May 24, 2022
commit 5e31fe61df66fa0569ec20bc422b314aa1e5ad95
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

Expand All @@ -51,12 +50,12 @@
*
* @param <T> the value type
*/
final class ReplayProcessor<T> extends Flux<T> implements InternalManySink<T>, CoreSubscriber<T>, ContextHolder, Disposable, Fuseable, Scannable {
final class SinkManyReplayProcessor<T> extends Flux<T> implements InternalManySink<T>, CoreSubscriber<T>, ContextHolder, Disposable, Fuseable, Scannable {

//TODO make it a ManyUpstreamAdapter as well? (must be done in 3.4.x first)

/**
* Create a {@link ReplayProcessor} that caches the last element it has pushed,
* Create a {@link SinkManyReplayProcessor} that caches the last element it has pushed,
* replaying it to late subscribers. This is a buffer-based ReplayProcessor with
* a history size of 1.
* <p>
Expand All @@ -65,15 +64,15 @@ final class ReplayProcessor<T> extends Flux<T> implements InternalManySink<T>, C
*
* @param <T> the type of the pushed elements
*
* @return a new {@link ReplayProcessor} that replays its last pushed element to each new
* @return a new {@link SinkManyReplayProcessor} that replays its last pushed element to each new
* {@link Subscriber}
*/
static <T> ReplayProcessor<T> cacheLast() {
static <T> SinkManyReplayProcessor<T> cacheLast() {
return cacheLastOrDefault(null);
}

/**
* Create a {@link ReplayProcessor} that caches the last element it has pushed,
* Create a {@link SinkManyReplayProcessor} that caches the last element it has pushed,
* replaying it to late subscribers. If a {@link Subscriber} comes in <b>before</b>
* any value has been pushed, then the {@code defaultValue} is emitted instead.
* This is a buffer-based ReplayProcessor with a history size of 1.
Expand All @@ -85,64 +84,64 @@ static <T> ReplayProcessor<T> cacheLast() {
* cached yet.
* @param <T> the type of the pushed elements
*
* @return a new {@link ReplayProcessor} that replays its last pushed element to each new
* @return a new {@link SinkManyReplayProcessor} that replays its last pushed element to each new
* {@link Subscriber}, or a default one if nothing was pushed yet
*/
static <T> ReplayProcessor<T> cacheLastOrDefault(@Nullable T value) {
ReplayProcessor<T> b = create(1);
static <T> SinkManyReplayProcessor<T> cacheLastOrDefault(@Nullable T value) {
SinkManyReplayProcessor<T> b = create(1);
if (value != null) {
b.onNext(value);
}
return b;
}

/**
* Create a new {@link ReplayProcessor} that replays an unbounded number of elements,
* Create a new {@link SinkManyReplayProcessor} that replays an unbounded number of elements,
* using a default internal {@link Queues#SMALL_BUFFER_SIZE Queue}.
*
* @param <E> the type of the pushed elements
*
* @return a new {@link ReplayProcessor} that replays the whole history to each new
* @return a new {@link SinkManyReplayProcessor} that replays the whole history to each new
* {@link Subscriber}.
*/
static <E> ReplayProcessor<E> create() {
static <E> SinkManyReplayProcessor<E> create() {
return create(Queues.SMALL_BUFFER_SIZE, true);
}

/**
* Create a new {@link ReplayProcessor} that replays up to {@code historySize}
* Create a new {@link SinkManyReplayProcessor} that replays up to {@code historySize}
* elements.
*
* @param historySize the backlog size, ie. maximum items retained for replay.
* @param <E> the type of the pushed elements
*
* @return a new {@link ReplayProcessor} that replays a limited history to each new
* @return a new {@link SinkManyReplayProcessor} that replays a limited history to each new
* {@link Subscriber}.
*/
static <E> ReplayProcessor<E> create(int historySize) {
static <E> SinkManyReplayProcessor<E> create(int historySize) {
return create(historySize, false);
}

/**
* Create a new {@link ReplayProcessor} that either replay all the elements or a
* Create a new {@link SinkManyReplayProcessor} that either replay all the elements or a
* limited amount of elements depending on the {@code unbounded} parameter.
*
* @param historySize maximum items retained if bounded, or initial link size if unbounded
* @param unbounded true if "unlimited" data store must be supplied
* @param <E> the type of the pushed elements
*
* @return a new {@link ReplayProcessor} that replays the whole history to each new
* @return a new {@link SinkManyReplayProcessor} that replays the whole history to each new
* {@link Subscriber} if configured as unbounded, a limited history otherwise.
*/
static <E> ReplayProcessor<E> create(int historySize, boolean unbounded) {
static <E> SinkManyReplayProcessor<E> create(int historySize, boolean unbounded) {
FluxReplay.ReplayBuffer<E> buffer;
if (unbounded) {
buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize);
}
else {
buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize);
}
return new ReplayProcessor<>(buffer);
return new SinkManyReplayProcessor<>(buffer);
}

/**
Expand All @@ -169,9 +168,9 @@ static <E> ReplayProcessor<E> create(int historySize, boolean unbounded) {
* @param <T> the type of items observed and emitted by the Processor
* @param maxAge the maximum age of the contained items
*
* @return a new {@link ReplayProcessor} that replays elements based on their age.
* @return a new {@link SinkManyReplayProcessor} that replays elements based on their age.
*/
static <T> ReplayProcessor<T> createTimeout(Duration maxAge) {
static <T> SinkManyReplayProcessor<T> createTimeout(Duration maxAge) {
return createTimeout(maxAge, Schedulers.parallel());
}

Expand Down Expand Up @@ -199,9 +198,9 @@ static <T> ReplayProcessor<T> createTimeout(Duration maxAge) {
* @param <T> the type of items observed and emitted by the Processor
* @param maxAge the maximum age of the contained items
*
* @return a new {@link ReplayProcessor} that replays elements based on their age.
* @return a new {@link SinkManyReplayProcessor} that replays elements based on their age.
*/
static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler) {
static <T> SinkManyReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler) {
return createSizeAndTimeout(Integer.MAX_VALUE, maxAge, scheduler);
}

Expand Down Expand Up @@ -231,10 +230,10 @@ static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler
* @param maxAge the maximum age of the contained items
* @param size the maximum number of buffered items
*
* @return a new {@link ReplayProcessor} that replay up to {@code size} elements, but
* @return a new {@link SinkManyReplayProcessor} that replay up to {@code size} elements, but
* will evict them from its history based on their age.
*/
static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge) {
static <T> SinkManyReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge) {
return createSizeAndTimeout(size, maxAge, Schedulers.parallel());
}

Expand Down Expand Up @@ -264,17 +263,17 @@ static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge) {
* @param size the maximum number of buffered items
* @param scheduler the {@link Scheduler} that provides the current time
*
* @return a new {@link ReplayProcessor} that replay up to {@code size} elements, but
* @return a new {@link SinkManyReplayProcessor} that replay up to {@code size} elements, but
* will evict them from its history based on their age.
*/
static <T> ReplayProcessor<T> createSizeAndTimeout(int size,
Duration maxAge,
Scheduler scheduler) {
static <T> SinkManyReplayProcessor<T> createSizeAndTimeout(int size,
Duration maxAge,
Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
if (size <= 0) {
throw new IllegalArgumentException("size > 0 required but it was " + size);
}
return new ReplayProcessor<>(new FluxReplay.SizeAndTimeBoundReplayBuffer<>(size,
return new SinkManyReplayProcessor<>(new FluxReplay.SizeAndTimeBoundReplayBuffer<>(size,
maxAge.toNanos(),
scheduler));
}
Expand All @@ -285,12 +284,12 @@ static <T> ReplayProcessor<T> createSizeAndTimeout(int size,

volatile FluxReplay.ReplaySubscription<T>[] subscribers;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<ReplayProcessor, FluxReplay.ReplaySubscription[]>
SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(ReplayProcessor.class,
static final AtomicReferenceFieldUpdater<SinkManyReplayProcessor, FluxReplay.ReplaySubscription[]>
SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(SinkManyReplayProcessor.class,
FluxReplay.ReplaySubscription[].class,
"subscribers");

ReplayProcessor(FluxReplay.ReplayBuffer<T> buffer) {
SinkManyReplayProcessor(FluxReplay.ReplayBuffer<T> buffer) {
this.buffer = buffer;
SUBSCRIBERS.lazySet(this, EMPTY);
}
Expand Down Expand Up @@ -494,7 +493,7 @@ static final class ReplayInner<T>

final CoreSubscriber<? super T> actual;

final ReplayProcessor<T> parent;
final SinkManyReplayProcessor<T> parent;

final FluxReplay.ReplayBuffer<T> buffer;

Expand All @@ -519,7 +518,7 @@ static final class ReplayInner<T>
int fusionMode;

ReplayInner(CoreSubscriber<? super T> actual,
ReplayProcessor<T> parent) {
SinkManyReplayProcessor<T> parent) {
this.actual = actual;
this.parent = parent;
this.buffer = parent.buffer;
Expand Down
36 changes: 18 additions & 18 deletions reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,47 +129,47 @@ public <T> Many<T> directBestEffort() {

@Override
public <T> Many<T> all() {
return ReplayProcessor.create();
return SinkManyReplayProcessor.create();
}

@Override
public <T> Many<T> all(int batchSize) {
return ReplayProcessor.create(batchSize);
return SinkManyReplayProcessor.create(batchSize);
}

@Override
public <T> Many<T> latest() {
return ReplayProcessor.cacheLast();
return SinkManyReplayProcessor.cacheLast();
}

@Override
public <T> Many<T> latestOrDefault(T value) {
return ReplayProcessor.cacheLastOrDefault(value);
return SinkManyReplayProcessor.cacheLastOrDefault(value);
}

@Override
public <T> Many<T> limit(int historySize) {
return ReplayProcessor.create(historySize);
return SinkManyReplayProcessor.create(historySize);
}

@Override
public <T> Many<T> limit(Duration maxAge) {
return ReplayProcessor.createTimeout(maxAge);
return SinkManyReplayProcessor.createTimeout(maxAge);
}

@Override
public <T> Many<T> limit(Duration maxAge, Scheduler scheduler) {
return ReplayProcessor.createTimeout(maxAge, scheduler);
return SinkManyReplayProcessor.createTimeout(maxAge, scheduler);
}

@Override
public <T> Many<T> limit(int historySize, Duration maxAge) {
return ReplayProcessor.createSizeAndTimeout(historySize, maxAge);
return SinkManyReplayProcessor.createSizeAndTimeout(historySize, maxAge);
}

@Override
public <T> Many<T> limit(int historySize, Duration maxAge, Scheduler scheduler) {
return ReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
return SinkManyReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
}
}

Expand Down Expand Up @@ -253,28 +253,28 @@ public <T> Many<T> directBestEffort() {
@Override
public <T> Many<T> all() {
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.create();
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.create();
return wrapMany(original);
}

@Override
public <T> Many<T> all(int batchSize) {
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5
final ReplayProcessor<T> original = ReplayProcessor.create(batchSize, true);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.create(batchSize, true);
return wrapMany(original);
}

@Override
public <T> Many<T> latest() {
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.cacheLast();
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.cacheLast();
return wrapMany(original);
}

@Override
public <T> Many<T> latestOrDefault(T value) {
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.cacheLastOrDefault(value);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.cacheLastOrDefault(value);
return wrapMany(original);
}

Expand All @@ -284,21 +284,21 @@ public <T> Many<T> limit(int historySize) {
throw new IllegalArgumentException("historySize must be > 0");
}
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.create(historySize);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.create(historySize);
return wrapMany(original);
}

@Override
public <T> Many<T> limit(Duration maxAge) {
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.createTimeout(maxAge);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.createTimeout(maxAge);
return wrapMany(original);
}

@Override
public <T> Many<T> limit(Duration maxAge, Scheduler scheduler) {
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.createTimeout(maxAge, scheduler);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.createTimeout(maxAge, scheduler);
return wrapMany(original);
}

Expand All @@ -308,7 +308,7 @@ public <T> Many<T> limit(int historySize, Duration maxAge) {
throw new IllegalArgumentException("historySize must be > 0");
}
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.createSizeAndTimeout(historySize, maxAge);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.createSizeAndTimeout(historySize, maxAge);
return wrapMany(original);
}

Expand All @@ -318,7 +318,7 @@ public <T> Many<T> limit(int historySize, Duration maxAge, Scheduler scheduler)
throw new IllegalArgumentException("historySize must be > 0");
}
@SuppressWarnings("deprecation") // ReplayProcessor will be removed in 3.5.
final ReplayProcessor<T> original = ReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
final SinkManyReplayProcessor<T> original = SinkManyReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
return wrapMany(original);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
* {@link org.reactivestreams.Processor} extending {@link reactor.core.publisher.SinkManyAbstractBase} are available:
* <ul>
* <li>A synchronous/non-opinionated pub-sub replaying capable event emitter :
* {@link reactor.core.publisher.EmitterProcessor},
* {@link reactor.core.publisher.ReplayProcessor},
* {@link reactor.core.publisher.SinkManyEmitterProcessor},
* {@link reactor.core.publisher.SinkManyReplayProcessor},
* {@link reactor.core.publisher.SinkManyUnicast} and
* {@link reactor.core.publisher.DirectProcessor}</li>
* <li>{@link reactor.core.publisher.SinkManyAbstractBase} itself offers factories to build arbitrary {@link org.reactivestreams.Processor}</li>
Expand All @@ -40,6 +40,7 @@
**
* @author Stephane Maldini
*/
//FIXME remove processors from package-info
@NonNullApi
package reactor.core.publisher;

Expand Down
Loading