Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SurroundingPublisher and HttpResponse.of(headers, pub, trailers) #4727

Merged
merged 41 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8428d0f
Add `SurroundingPublisher` and `HttpResponse.of(headers, pub, trailers)`
injae-kim Mar 6, 2023
9c99369
Introduce `SurroundingPublisher`
injae-kim Apr 3, 2023
ed38134
Remove `Prepending, AppendingPublisher`
injae-kim Apr 3, 2023
df49ae6
Remove unused field
injae-kim Apr 4, 2023
1871376
Use `HttpData` as param type for publisher
injae-kim May 15, 2023
819936a
Address comments on `SurroundingPublisher`
injae-kim May 15, 2023
5e2a95b
Make SurroundingPublisher extends SteamMessage
injae-kim May 21, 2023
c68c29a
Add `HttpRequest.of(headers, publisher, trailers`
injae-kim May 21, 2023
aa73a1f
Optimize for empty trailers
injae-kim May 22, 2023
088f67b
Address `ikhoon` comments
injae-kim Jun 4, 2023
4e2c1ce
Fix timing issue on ByteBufLeakTest
injae-kim Jun 4, 2023
314fe22
Fix checkStyle
injae-kim Jun 4, 2023
31fb027
Address `minwoox`'s comments
injae-kim Jun 14, 2023
3cb59d7
Address nit comments
injae-kim Jun 14, 2023
e858510
Make publishDownstream logic more clear
injae-kim Jun 14, 2023
a36af31
Address `ikhoon`'s comments
injae-kim Jun 15, 2023
8a435e7
Address comments
injae-kim Jun 20, 2023
807d9ae
Address `jrhee17` comments and add more unit tests
injae-kim Jun 22, 2023
d6563cb
Address `ikhoon`'s comments
injae-kim Jul 18, 2023
013ba13
Address `jrhee17` comment
injae-kim Jul 23, 2023
85e9198
Update core/src/main/java/com/linecorp/armeria/internal/common/stream…
ikhoon Jul 25, 2023
aab332b
Merge branch 'main' into appending-publisher
injae-kim Jul 26, 2023
e9bb4f6
Remove unused import
injae-kim Jul 26, 2023
ca07986
Merge branch 'main' into appending-publisher
ikhoon Jul 27, 2023
89ae854
Merge branch 'main' into appending-publisher
ikhoon Jul 31, 2023
46a461d
Add LoggingSerivce to MatrixVariablesTest
ikhoon Jul 31, 2023
97c5b1d
Merge branch 'main' into appending-publisher
injae-kim Aug 4, 2023
9305f57
Handle upstream(ChannelSendOperator)'s onNext without request
injae-kim Aug 5, 2023
a253b3e
Revert "Handle upstream(ChannelSendOperator)'s onNext without request"
ikhoon Aug 10, 2023
b6b4727
Add debug logs to ArmeriaServerHttpResponse
ikhoon Aug 10, 2023
16ed4cb
Merge branch 'main' into appending-publisher
ikhoon Aug 18, 2023
9be686d
Merge branch 'main' into appending-publisher
ikhoon Aug 18, 2023
3a3808a
add more debug
ikhoon Aug 18, 2023
98526bc
Merge branch 'main' into appending-publisher
ikhoon Aug 19, 2023
476928e
temporary disable broken tests
ikhoon Aug 19, 2023
d90b8dd
Revert "temporary disable broken tests"
ikhoon Aug 21, 2023
7cdbf4c
Revert "add more debug"
ikhoon Aug 21, 2023
37ed849
Forked `ChannelSendOperator`
ikhoon Aug 21, 2023
cf0d3c7
fix compile errors
ikhoon Aug 21, 2023
da2162d
address lint
ikhoon Aug 21, 2023
732744b
indent
ikhoon Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/HttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.internal.common.DefaultHttpRequest;
import com.linecorp.armeria.internal.common.DefaultSplitHttpRequest;
import com.linecorp.armeria.internal.common.stream.SurroundingPublisher;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.buffer.ByteBufAllocator;
Expand Down Expand Up @@ -280,6 +281,26 @@ static HttpRequest of(RequestHeaders headers, Publisher<? extends HttpObject> pu
}
}

/**
* Creates a new instance from an existing {@link RequestHeaders}, {@link Publisher} and trailers.
*
* <p>Note that the {@link HttpData}s in the {@link Publisher} are not released when
* {@link Subscription#cancel()} or {@link #abort()} is called. You should add a hook in order to
* release the elements. See {@link PublisherBasedStreamMessage} for more information.
*/
@UnstableApi
static HttpRequest of(RequestHeaders headers,
minwoox marked this conversation as resolved.
Show resolved Hide resolved
Publisher<? extends HttpData> publisher,
HttpHeaders trailers) {
requireNonNull(headers, "headers");
requireNonNull(publisher, "publisher");
requireNonNull(trailers, "trailers");
if (trailers.isEmpty()) {
return of(headers, publisher);
}
return of(headers, new SurroundingPublisher<>(null, publisher, trailers));
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns a new {@link HttpRequestBuilder}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,23 @@ static HttpResponse of(ResponseHeaders headers, Publisher<? extends HttpObject>
return PublisherBasedHttpResponse.from(headers, publisher);
}

/**
* Creates a new HTTP response with the specified headers and trailers
* whose stream is produced from an existing {@link Publisher}.
*
* <p>Note that the {@link HttpData}s in the {@link Publisher} are not released when
* {@link Subscription#cancel()} or {@link #abort()} is called. You should add a hook in order to
* release the elements. See {@link PublisherBasedStreamMessage} for more information.
*/
static HttpResponse of(ResponseHeaders headers,
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
Publisher<? extends HttpData> publisher,
HttpHeaders trailers) {
requireNonNull(headers, "headers");
requireNonNull(publisher, "publisher");
requireNonNull(trailers, "trailers");
return PublisherBasedHttpResponse.from(headers, publisher, trailers);
}

/**
* Creates a new HTTP response with the specified {@code content} that is converted into JSON using the
* default {@link ObjectMapper}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.errorprone.annotations.FormatString;

import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.stream.StreamMessage;

/**
* Builds a new {@link HttpResponse}.
Expand Down Expand Up @@ -299,8 +298,7 @@ public HttpResponse build() {
if (trailers == null) {
return HttpResponse.of(responseHeaders, publisher);
} else {
return HttpResponse.of(responseHeaders,
StreamMessage.concat(publisher, StreamMessage.of(trailers.build())));
return HttpResponse.of(responseHeaders, publisher, trailers.build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@
import org.reactivestreams.Publisher;

import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import com.linecorp.armeria.internal.common.stream.PrependingPublisher;
import com.linecorp.armeria.internal.common.stream.SurroundingPublisher;

final class PublisherBasedHttpResponse extends PublisherBasedStreamMessage<HttpObject> implements HttpResponse {

static PublisherBasedHttpResponse from(ResponseHeaders headers, Publisher<? extends HttpObject> publisher) {
return new PublisherBasedHttpResponse(new PrependingPublisher<>(headers, publisher));
return new PublisherBasedHttpResponse(new SurroundingPublisher<>(headers, publisher, null));
}

static PublisherBasedHttpResponse from(ResponseHeaders headers,
Publisher<? extends HttpData> publisher,
HttpHeaders trailers) {
if (trailers.isEmpty()) {
return from(headers, publisher);
}
return new PublisherBasedHttpResponse(new SurroundingPublisher<>(headers, publisher, trailers));
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
}

PublisherBasedHttpResponse(Publisher<? extends HttpObject> publisher) {
Expand Down

This file was deleted.

Loading
Loading