diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 3b33ad46e57..83cec575b0e 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -53,9 +53,8 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.*; +import java.util.concurrent.atomic.AtomicReference; import static io.deephaven.extensions.barrage.util.BarrageUtil.DEFAULT_SNAPSHOT_DESER_OPTIONS; @@ -270,6 +269,31 @@ public void close() { } } + /** + * Represents states for a DoExchange stream where the server must not close until the client has half closed. + */ + enum HalfClosedState { + /** + * Client has not half-closed, server should not half close until the client has done so. + */ + DONT_CLOSE, + /** + * Indicates that the client has half-closed, and the server should half close immediately after finishing + * sending data. + */ + CLIENT_HALF_CLOSED, + + /** + * The server has no more data to send, but client hasn't half-closed. + */ + FINISHED_SENDING, + + /** + * Streaming finished and client half-closed. + */ + CLOSED + } + /** * Helper class that maintains a subscription whether it was created by a bi-directional stream request or the * no-client-streaming request. If the SubscriptionRequest sets the sequence, then it treats sequence as a watermark @@ -439,6 +463,8 @@ private void tryClose() { */ private class SnapshotRequestHandler implements Handler { + private final AtomicReference halfClosedState = + new AtomicReference<>(HalfClosedState.DONT_CLOSE); public SnapshotRequestHandler() {} @@ -493,7 +519,25 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { // leverage common code for `DoGet` and `BarrageSnapshotOptions` BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport, reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener, metrics); - listener.onCompleted(); + HalfClosedState newState = halfClosedState.updateAndGet(current -> { + switch (current) { + case DONT_CLOSE: + // record that we have finished sending + return HalfClosedState.FINISHED_SENDING; + case CLIENT_HALF_CLOSED: + // since streaming has now finished, and client already half-closed, time to + // half close from server + return HalfClosedState.CLOSED; + case FINISHED_SENDING: + case CLOSED: + throw new IllegalStateException("Can't finish streaming twice"); + default: + throw new IllegalStateException("Unknown state " + current); + } + }); + if (newState == HalfClosedState.CLOSED) { + listener.onCompleted(); + } }); } } @@ -501,6 +545,25 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { @Override public void close() { // no work to do for DoGetRequest close + // possibly safely complete if finished sending data + HalfClosedState newState = halfClosedState.updateAndGet(current -> { + switch (current) { + case DONT_CLOSE: + // record that we have half closed + return HalfClosedState.CLIENT_HALF_CLOSED; + case FINISHED_SENDING: + // since client has now half closed, and we're done sending, time to half-close from server + return HalfClosedState.CLOSED; + case CLIENT_HALF_CLOSED: + case CLOSED: + throw new IllegalStateException("Can't close twice"); + default: + throw new IllegalStateException("Unknown state " + current); + } + }); + if (newState == HalfClosedState.CLOSED) { + listener.onCompleted(); + } } }