Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SurroundingPublisher and HttpResponse.of(headers, pub, trailers) #4727

Merged
merged 41 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8428d0f
Add `SurroundingPublisher` and `HttpResponse.of(headers, pub, trailers)`
injae-kim Mar 6, 2023
9c99369
Introduce `SurroundingPublisher`
injae-kim Apr 3, 2023
ed38134
Remove `Prepending, AppendingPublisher`
injae-kim Apr 3, 2023
df49ae6
Remove unused field
injae-kim Apr 4, 2023
1871376
Use `HttpData` as param type for publisher
injae-kim May 15, 2023
819936a
Address comments on `SurroundingPublisher`
injae-kim May 15, 2023
5e2a95b
Make SurroundingPublisher extends SteamMessage
injae-kim May 21, 2023
c68c29a
Add `HttpRequest.of(headers, publisher, trailers`
injae-kim May 21, 2023
aa73a1f
Optimize for empty trailers
injae-kim May 22, 2023
088f67b
Address `ikhoon` comments
injae-kim Jun 4, 2023
4e2c1ce
Fix timing issue on ByteBufLeakTest
injae-kim Jun 4, 2023
314fe22
Fix checkStyle
injae-kim Jun 4, 2023
31fb027
Address `minwoox`'s comments
injae-kim Jun 14, 2023
3cb59d7
Address nit comments
injae-kim Jun 14, 2023
e858510
Make publishDownstream logic more clear
injae-kim Jun 14, 2023
a36af31
Address `ikhoon`'s comments
injae-kim Jun 15, 2023
8a435e7
Address comments
injae-kim Jun 20, 2023
807d9ae
Address `jrhee17` comments and add more unit tests
injae-kim Jun 22, 2023
d6563cb
Address `ikhoon`'s comments
injae-kim Jul 18, 2023
013ba13
Address `jrhee17` comment
injae-kim Jul 23, 2023
85e9198
Update core/src/main/java/com/linecorp/armeria/internal/common/stream…
ikhoon Jul 25, 2023
aab332b
Merge branch 'main' into appending-publisher
injae-kim Jul 26, 2023
e9bb4f6
Remove unused import
injae-kim Jul 26, 2023
ca07986
Merge branch 'main' into appending-publisher
ikhoon Jul 27, 2023
89ae854
Merge branch 'main' into appending-publisher
ikhoon Jul 31, 2023
46a461d
Add LoggingSerivce to MatrixVariablesTest
ikhoon Jul 31, 2023
97c5b1d
Merge branch 'main' into appending-publisher
injae-kim Aug 4, 2023
9305f57
Handle upstream(ChannelSendOperator)'s onNext without request
injae-kim Aug 5, 2023
a253b3e
Revert "Handle upstream(ChannelSendOperator)'s onNext without request"
ikhoon Aug 10, 2023
b6b4727
Add debug logs to ArmeriaServerHttpResponse
ikhoon Aug 10, 2023
16ed4cb
Merge branch 'main' into appending-publisher
ikhoon Aug 18, 2023
9be686d
Merge branch 'main' into appending-publisher
ikhoon Aug 18, 2023
3a3808a
add more debug
ikhoon Aug 18, 2023
98526bc
Merge branch 'main' into appending-publisher
ikhoon Aug 19, 2023
476928e
temporary disable broken tests
ikhoon Aug 19, 2023
d90b8dd
Revert "temporary disable broken tests"
ikhoon Aug 21, 2023
7cdbf4c
Revert "add more debug"
ikhoon Aug 21, 2023
37ed849
Forked `ChannelSendOperator`
ikhoon Aug 21, 2023
cf0d3c7
fix compile errors
ikhoon Aug 21, 2023
da2162d
address lint
ikhoon Aug 21, 2023
732744b
indent
ikhoon Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address ikhoon comments
  • Loading branch information
injae-kim committed Jun 4, 2023
commit 088f67b30371f9f207d3a3f5d1609e41f63a1c9f
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ static HttpRequest of(RequestHeaders headers, Publisher<? extends HttpObject> pu
* {@link Subscription#cancel()} or {@link #abort()} is called. You should add a hook in order to
* release the elements. See {@link PublisherBasedStreamMessage} for more information.
*/
@UnstableApi
static HttpRequest of(RequestHeaders headers,
minwoox marked this conversation as resolved.
Show resolved Hide resolved
Publisher<? extends HttpData> publisher,
HttpHeaders trailers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand All @@ -33,6 +32,7 @@
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.NoopSubscriber;
import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
Expand All @@ -48,7 +48,7 @@ public final class SurroundingPublisher<T> implements StreamMessage<T> {

@Nullable
private final T head;
private final Publisher<? extends T> publisher;
private final StreamMessage<T> publisher;
@Nullable
private final T tail;

Expand All @@ -58,10 +58,15 @@ public final class SurroundingPublisher<T> implements StreamMessage<T> {
@Nullable
private volatile SurroundingSubscriber<T> surroundingSubscriber;

@SuppressWarnings("unchecked")
public SurroundingPublisher(@Nullable T head, Publisher<? extends T> publisher, @Nullable T tail) {
requireNonNull(publisher, "publisher");
this.head = head;
this.publisher = publisher;
if (publisher instanceof StreamMessage) {
this.publisher = (StreamMessage<T>) publisher;
} else {
this.publisher = new PublisherBasedStreamMessage<>(publisher);
}
this.tail = tail;
}

Expand Down Expand Up @@ -118,8 +123,7 @@ private void subscribe0(Subscriber<? super T> subscriber, EventExecutor executor
SubscriptionOption... options) {

final SurroundingSubscriber<T> surroundingSubscriber = new SurroundingSubscriber<>(
head, publisher, tail, subscriber, executor,
completionFuture, containsNotifyCancellation(options));
head, publisher, tail, subscriber, executor, completionFuture, options);
this.surroundingSubscriber = surroundingSubscriber;
subscriber.onSubscribe(surroundingSubscriber);

Expand Down Expand Up @@ -154,10 +158,6 @@ public void abort(Throwable cause) {

static final class SurroundingSubscriber<T> implements Subscriber<T>, Subscription {
injae-kim marked this conversation as resolved.
Show resolved Hide resolved

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SurroundingSubscriber, State> stateUpdater =
AtomicReferenceFieldUpdater.newUpdater(SurroundingSubscriber.class, State.class, "state");

enum State {
REQUIRE_HEAD,
REQUIRE_BODY,
Expand All @@ -170,7 +170,7 @@ enum State {

@Nullable
private final T head;
private final Publisher<? extends T> publisher;
private final StreamMessage<T> publisher;
@Nullable
private final T tail;

Expand All @@ -185,11 +185,11 @@ enum State {
private volatile boolean closed;
injae-kim marked this conversation as resolved.
Show resolved Hide resolved

private final CompletableFuture<Void> completionFuture;
private final boolean notifyCancellation;
private final SubscriptionOption[] options;

SurroundingSubscriber(@Nullable T head, Publisher<? extends T> publisher, @Nullable T tail,
SurroundingSubscriber(@Nullable T head, StreamMessage<T> publisher, @Nullable T tail,
Subscriber<? super T> downstream, EventExecutor executor,
CompletableFuture<Void> completionFuture, boolean notifyCancellation) {
CompletableFuture<Void> completionFuture, SubscriptionOption... options) {
requireNonNull(publisher, "publisher");
requireNonNull(downstream, "downstream");
requireNonNull(executor, "executor");
Expand All @@ -200,7 +200,7 @@ enum State {
this.downstream = downstream;
this.executor = executor;
this.completionFuture = completionFuture;
this.notifyCancellation = notifyCancellation;
this.options = options;
}

@Override
Expand Down Expand Up @@ -253,7 +253,7 @@ private void publish() {
case REQUIRE_BODY: {
if (!subscribed) {
subscribed = true;
publisher.subscribe(this);
publisher.subscribe(this, executor, options);
return;
}
if (upstream != null) {
Expand Down Expand Up @@ -362,7 +362,7 @@ private void cancel0() {
closed = true;

final CancelledSubscriptionException cause = CancelledSubscriptionException.get();
if (notifyCancellation) {
if (containsNotifyCancellation(options)) {
downstream.onError(cause);
}
downstream = NoopSubscriber.get();
Expand Down Expand Up @@ -407,9 +407,11 @@ private void release() {
}
}

private boolean setState(State oldState, State newState) {
private void setState(State oldState, State newState) {
assert state == oldState :
"curState: " + state + ", oldState: " + oldState + ", newState: " + newState;
assert newState != State.REQUIRE_HEAD : "oldState: " + oldState + ", newState: " + newState;
return stateUpdater.compareAndSet(this, oldState, newState);
state = newState;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,12 @@ public StreamMessage<Object> createPublisher(long elements) {
return publisher;
}
if (elements == 1) {
// return new SurroundingPublisher<>("head", Mono.empty(), null);
// return new SurroundingPublisher<>(null, Mono.just(1), null);
return new SurroundingPublisher<>(null, Mono.empty(), "tail");
return new SurroundingPublisher<>("head", Mono.empty(), null);
}
if (elements == 2) {
// return new SurroundingPublisher<>("head", Mono.just(1), null);
// return new SurroundingPublisher<>("head", Mono.empty(), "tail");
// return new SurroundingPublisher<>(null, Mono.just(1), "tail");
return new SurroundingPublisher<>(null, Flux.just(1, 2), null);
return new SurroundingPublisher<>(null, Mono.just(1), "tail");
}
// return new SurroundingPublisher<>("head", Flux.fromStream(LongStream.range(0, elements - 1).boxed()), null);
// return new SurroundingPublisher<>("head", Flux.fromStream(LongStream.range(0, elements - 2).boxed()), "tail");
// return new SurroundingPublisher<>(null, Flux.fromStream(LongStream.range(0, elements - 1).boxed()), "tail");
return new SurroundingPublisher<>(null, Flux.fromStream(LongStream.range(0, elements).boxed()), null);
return new SurroundingPublisher<>("head", Flux.fromStream(LongStream.range(0, elements - 2).boxed()), "tail");
}

/**
Expand Down