Skip to content

Commit

Permalink
Merge reactor#2164 into 3.3.6
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed May 19, 2020
2 parents 9b14da6 + 103190f commit c543523
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void cancel() {
super.cancel();
}
else {
Operators.terminate(S, this);
super.terminate();
if (closureSubscriber != null) {
closureSubscriber.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void cancel() {
super.cancel();
}
else {
Operators.terminate(S, this);
super.terminate();
if (closureSubscriber != null) {
closureSubscriber.cancel();
}
Expand Down
96 changes: 58 additions & 38 deletions reactor-core/src/main/java/reactor/core/publisher/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -1583,53 +1583,72 @@ public int size() {
public static class DeferredSubscription
implements Subscription, Scannable {

volatile Subscription s;
static final int STATE_CANCELLED = -2;
static final int STATE_SUBSCRIBED = -1;

Subscription s;
volatile long requested;

protected boolean isCancelled(){
return s == cancelledSubscription();
return requested == STATE_CANCELLED;
}

@Override
public void cancel() {
Subscription a = s;
if (a != cancelledSubscription()) {
a = S.getAndSet(this, cancelledSubscription());
if (a != null && a != cancelledSubscription()) {
a.cancel();
}
final long state = REQUESTED.getAndSet(this, STATE_CANCELLED);
if (state == STATE_CANCELLED) {
return;
}

if (state == STATE_SUBSCRIBED) {
this.s.cancel();
}
}

protected void terminate() {
REQUESTED.getAndSet(this, STATE_CANCELLED);
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
long requested = this.requested; // volatile read to see subscription
if (key == Attr.PARENT) return s;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested < 0 ? 0 : requested;
if (key == Attr.CANCELLED) return isCancelled();

return null;
}

@Override
public void request(long n) {
Subscription a = s;
if (a != null) {
a.request(n);
}
else {
addCap(REQUESTED, this, n);

a = s;
long r = this.requested; // volatile read beforehand

if (a != null) {
long r = REQUESTED.getAndSet(this, 0L);
if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened
long u;
for (;;) { // normal CAS loop with overflow protection
if (r == Long.MAX_VALUE) { // if r == Long.MAX_VALUE then we dont care and we can loose this request just in case of racing
return;
}
u = Operators.addCap(r, n);
if (REQUESTED.compareAndSet(this, r, u)) { // Means increment happened before onSubscribe
return;
}
else { // Means increment happened after onSubscribe
r = this.requested; // update new state to see what exactly happened (onSubscribe | cancel | requestN)

if (r != 0L) {
a.request(r);
if (r < 0) { // check state (expect -1 | -2 to exit, otherwise repeat)
break;
}
}
}
}

if (r == STATE_CANCELLED) { // if canceled, just exit
return;
}

this.s.request(n); // if onSubscribe -> subscription exists (and we sure of that because volatile read after volatile write) so we can execute requestN on the subscription
}

/**
Expand All @@ -1640,8 +1659,9 @@ public void request(long n) {
*/
public final boolean set(Subscription s) {
Objects.requireNonNull(s, "s");
final long state = this.requested;
Subscription a = this.s;
if (a == cancelledSubscription()) {
if (state == STATE_CANCELLED) {
s.cancel();
return false;
}
Expand All @@ -1651,30 +1671,30 @@ public final boolean set(Subscription s) {
return false;
}

if (S.compareAndSet(this, null, s)) {
long r;
long accumulated = 0;
for (;;) {
r = this.requested;

long r = REQUESTED.getAndSet(this, 0L);

if (r != 0L) {
s.request(r);
if (r == STATE_CANCELLED || r == STATE_SUBSCRIBED) {
s.cancel();
return false;
}

return true;
}
this.s = s;

a = this.s;
long toRequest = r - accumulated;
if (toRequest > 0) { // if there is something,
s.request(toRequest); // then we do a request on the given subscription
}
accumulated += toRequest;

if (a != cancelledSubscription()) {
s.cancel();
reportSubscriptionSet();
return false;
if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) {
return true;
}
}

return false;
}

static final AtomicReferenceFieldUpdater<DeferredSubscription, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(DeferredSubscription.class, Subscription.class, "s");
static final AtomicLongFieldUpdater<DeferredSubscription> REQUESTED =
AtomicLongFieldUpdater.newUpdater(DeferredSubscription.class, "requested");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,30 @@ public void scanCancelledSubscription() {
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}


@Test
public void shouldBeSerialIfRacy() {
for (int i = 0; i < 10000; i++) {
long[] requested = new long[] { 0 };
Subscription mockSubscription = Mockito.mock(Subscription.class);
Mockito.doAnswer(a -> requested[0] += (long) a.getArgument(0)).when(mockSubscription).request(Mockito.anyLong());
DeferredSubscription deferredSubscription = new DeferredSubscription();

deferredSubscription.request(5);

RaceTestUtils.race(() -> deferredSubscription.set(mockSubscription),
() -> {
deferredSubscription.request(10);
deferredSubscription.request(10);
deferredSubscription.request(10);
});

deferredSubscription.request(15);

Assertions.assertThat(requested[0]).isEqualTo(50L);
}
}

@Test
public void scanDeferredSubscription() {
DeferredSubscription test = new DeferredSubscription();
Expand Down

0 comments on commit c543523

Please sign in to comment.