Skip to content

Commit

Permalink
Don't leak the viewport subscription when the viewport data is read (d…
Browse files Browse the repository at this point in the history
…eephaven#4420)

Also closes other streams after the client is closed.

Fixes deephaven#4410
  • Loading branch information
niloc132 committed Oct 25, 2023
1 parent 437cd15 commit de313bb
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -741,26 +741,27 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column

/**
* Gets the currently visible viewport. If the current set of operations has not yet resulted in data, it will not
* resolve until that data is ready.
* resolve until that data is ready. If this table is closed before the promise resolves, it will be rejected - to
* separate the lifespan of this promise from the table itself, call
* {@link TableViewportSubscription#getViewportData()} on the result from {@link #setViewport(double, double)}.
*
* @return Promise of {@link TableData}
*
*/
@JsMethod
public Promise<TableData> getViewportData() {
TableViewportSubscription subscription = subscriptions.get(getHandle());
if (subscription == null) {
return Promise.reject("No viewport currently set");
}
return subscription.getViewportData();
return subscription.getInternalViewportData();
}

public Promise<TableData> getInternalViewportData() {
final LazyPromise<TableData> promise = new LazyPromise<>();
final ClientTableState active = state();
active.onRunning(state -> {
if (currentViewportData == null) {
// no viewport data received yet; let's setup a one-shot UPDATED event listener
// no viewport data received yet; let's set up a one-shot UPDATED event listener
addEventListenerOneShot(EVENT_UPDATED, ignored -> promise.succeed(currentViewportData));
} else {
promise.succeed(currentViewportData);
Expand Down Expand Up @@ -1961,8 +1962,7 @@ public void setState(final ClientTableState state) {
&& existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
JsLog.debug("closing old viewport", state(), existingSubscription.state());
// with the replacement state successfully running, we can shut down the old viewport (unless
// something
// external retained it)
// something external retained it)
existingSubscription.internalClose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ReleaseRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.SessionServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.UnaryResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.storage_pb_service.StorageServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.ApplyPreviewColumnsRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.EmptyTableRequest;
Expand Down Expand Up @@ -179,7 +180,6 @@ private enum State {
private List<Callback<Void, String>> onOpen = new ArrayList<>();

private State state;
private double killTimerCancelation;
private SessionServiceClient sessionServiceClient;
private TableServiceClient tableServiceClient;
private ConsoleServiceClient consoleServiceClient;
Expand Down Expand Up @@ -209,6 +209,8 @@ private enum State {
private JsConsumer<LogItem> recordLog = pastLogs::add;
private ResponseStreamWrapper<LogSubscriptionData> logStream;

private UnaryResponse terminationStream;

private final JsSet<JsConsumer<JsVariableChanges>> fieldUpdatesCallback = new JsSet<>();
private Map<String, JsVariableDefinition> knownFields = new HashMap<>();
private ResponseStreamWrapper<FieldsChangeUpdate> fieldsChangeUpdateStream;
Expand Down Expand Up @@ -386,6 +388,9 @@ private boolean checkStatus(ResponseStreamWrapper.ServiceError fail) {
}

public boolean checkStatus(ResponseStreamWrapper.Status status) {
if (state == State.Disconnected) {
return false;
}
if (status.isOk()) {
// success, ignore
return true;
Expand Down Expand Up @@ -523,26 +528,31 @@ private Promise<Void> authUpdate() {
}

private void subscribeToTerminationNotification() {
sessionServiceClient.terminationNotification(new TerminationNotificationRequest(), metadata(),
(fail, success) -> {
if (fail != null) {
// Errors are treated like connection issues, won't signal any shutdown
if (checkStatus((ResponseStreamWrapper.ServiceError) fail)) {
// restart the termination notification
subscribeToTerminationNotification();
} else {
info.notifyConnectionError(Js.cast(fail));
connectionLost();
}
return;
}
assert success != null;
terminationStream =
sessionServiceClient.terminationNotification(new TerminationNotificationRequest(), metadata(),
(fail, success) -> {
if (state == State.Disconnected) {
// already disconnected, no need to respond
return;
}
if (fail != null) {
// Errors are treated like connection issues, won't signal any shutdown
if (checkStatus((ResponseStreamWrapper.ServiceError) fail)) {
// restart the termination notification
subscribeToTerminationNotification();
} else {
info.notifyConnectionError(Js.cast(fail));
connectionLost();
}
return;
}
assert success != null;

// welp; the server is gone -- let everyone know
connectionLost();
// welp; the server is gone -- let everyone know
connectionLost();

info.notifyServerShutdown(success);
});
info.notifyServerShutdown(success);
});
}

// @Override
Expand Down Expand Up @@ -644,9 +654,26 @@ public void forceClose() {
// explicitly mark as disconnected so reconnect isn't attempted
state = State.Disconnected;

// forcibly clean up the log stream and its listeners
if (logStream != null) {
logStream.cancel();
logStream = null;
}
pastLogs.clear();
logCallbacks.clear();

// Stop server streams, will not reconnect
if (terminationStream != null) {
terminationStream.cancel();
terminationStream = null;
}
if (exportNotifications != null) {
exportNotifications.cancel();
exportNotifications = null;
}

newSessionReconnect.disconnected();
DomGlobal.clearTimeout(killTimerCancelation);
DomGlobal.clearTimeout(scheduledAuthUpdate);
}

public void setSessionTimeoutMs(double sessionTimeoutMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void close() {
* forwarding events and optionally close the underlying table/subscription.
*/
public void internalClose() {
// indicate that the base table shouldn't get events any more, even if it this is still retained elsewhere
// indicate that the base table shouldn't get events anymore, even if it is still retained elsewhere
originalActive = false;

if (retained || status == Status.DONE) {
Expand Down

0 comments on commit de313bb

Please sign in to comment.