Skip to content

Commit

Permalink
C++ Client: Add UpdateBy Support (deephaven#4255)
Browse files Browse the repository at this point in the history
* C++ Client: Add UpdateBy Support

* Respond to review feedback
  • Loading branch information
kosak committed Aug 2, 2023
1 parent 51b88b3 commit decee54
Show file tree
Hide file tree
Showing 18 changed files with 1,512 additions and 30 deletions.
5 changes: 5 additions & 0 deletions cpp-client/deephaven/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(ALL_FILES
src/impl/string_expression_impl.cc
src/impl/table_handle_impl.cc
src/impl/table_handle_manager_impl.cc
src/impl/update_by_operation_impl.cc

include/private/deephaven/client/impl/aggregate_impl.h
include/private/deephaven/client/impl/boolean_expression_impl.h
Expand All @@ -42,6 +43,7 @@ set(ALL_FILES
include/private/deephaven/client/impl/string_expression_impl.h
include/private/deephaven/client/impl/table_handle_impl.h
include/private/deephaven/client/impl/table_handle_manager_impl.h
include/private/deephaven/client/impl/update_by_operation_impl.h
include/private/deephaven/client/impl/util.h

include/private/deephaven/client/arrowutil/arrow_column_source.h
Expand All @@ -53,15 +55,18 @@ set(ALL_FILES
src/client_options.cc
src/client.cc
src/flight.cc
src/update_by.cc

include/public/deephaven/client/columns.h
include/public/deephaven/client/expressions.h
include/public/deephaven/client/client.h
include/public/deephaven/client/client_options.h
include/public/deephaven/client/flight.h
include/public/deephaven/client/update_by.h
include/public/deephaven/client/utility/arrow_util.h

src/subscription/subscribe_thread.cc

include/private/deephaven/client/subscription/subscribe_thread.h
include/private/deephaven/client/subscription/subscription_handle.h

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "deephaven/client/client.h"
#include "deephaven/client/server/server.h"
#include "deephaven/client/subscription/subscription_handle.h"
#include "deephaven/client/update_by.h"
#include "deephaven/client/utility/executor.h"
#include "deephaven/dhcore/clienttable/schema.h"
#include "deephaven/dhcore/ticking/ticking.h"
Expand Down Expand Up @@ -115,7 +116,7 @@ class LazyState final {
*/
void waitUntilReady();

const LazyStateInfo &info();
const LazyStateInfo &info() const;

private:
std::shared_ptr<Server> server_;
Expand All @@ -140,6 +141,7 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
typedef deephaven::client::impl::BooleanExpressionImpl BooleanExpressionImpl;
typedef deephaven::client::subscription::SubscriptionHandle SubscriptionHandle;
typedef deephaven::client::utility::Executor Executor;
typedef deephaven::dhcore::clienttable::Schema Schema;
typedef deephaven::dhcore::ticking::TickingCallback TickingCallback;
typedef deephaven::dhcore::ElementTypeId ElementTypeId;
typedef io::deephaven::proto::backplane::grpc::AsOfJoinTablesRequest AsOfJoinTablesRequest;
Expand Down Expand Up @@ -209,6 +211,9 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
const TableHandleImpl &rightSide, std::vector<std::string> columnsToMatch,
std::vector<std::string> columnsToAdd);

std::shared_ptr<TableHandleImpl> updateBy(std::vector<std::shared_ptr<UpdateByOperationImpl>> ops,
std::vector<std::string> by);

std::vector<std::shared_ptr<ColumnImpl>> getColumnImpls();
std::shared_ptr<StrColImpl> getStrColImpl(std::string columnName);
std::shared_ptr<NumColImpl> getNumColImpl(std::string columnName);
Expand All @@ -226,8 +231,9 @@ class TableHandleImpl : public std::enable_shared_from_this<TableHandleImpl> {
*/
void observe();

int64_t numRows();
bool isStatic();
int64_t numRows() const;
bool isStatic() const;
std::shared_ptr<Schema> schema() const;

const std::shared_ptr<TableHandleManagerImpl> &managerImpl() const { return managerImpl_; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#pragma once

#include "deephaven/proto/table.pb.h"
#include "deephaven/proto/table.grpc.pb.h"

namespace deephaven::client::impl {
class UpdateByOperationImpl {
typedef io::deephaven::proto::backplane::grpc::UpdateByRequest::UpdateByOperation UpdateByOperationProto;
public:
explicit UpdateByOperationImpl(UpdateByOperationProto grpcOp);
UpdateByOperationImpl(UpdateByOperationImpl &&other) noexcept;
UpdateByOperationImpl &operator=(UpdateByOperationImpl &&other) noexcept;
~UpdateByOperationImpl();

[[nodiscard]] const UpdateByOperationProto &updateByProto() const { return updateByProto_; }

private:
UpdateByOperationProto updateByProto_;
};
} // namespace deephaven::client::impl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
#pragma once

#include <string_view>
#include <vector>
#include <google/protobuf/repeated_ptr_field.h>

namespace deephaven::client::highlevel::impl {
} // namespace deephaven::client::highlevel::impl
namespace deephaven::client::impl {
template<typename T>
void moveVectorData(std::vector<T> src, google::protobuf::RepeatedPtrField<T> *dest) {
for (auto &s: src) {
dest->Add(std::move(s));
}
}
} // namespace deephaven::client::impl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class Server : public std::enable_shared_from_this<Server> {
typedef io::deephaven::proto::backplane::grpc::SortDescriptor SortDescriptor;
typedef io::deephaven::proto::backplane::grpc::Ticket Ticket;
typedef io::deephaven::proto::backplane::grpc::TableService TableService;
typedef io::deephaven::proto::backplane::grpc::UpdateByRequest::UpdateByOperation UpdateByOperation;
typedef io::deephaven::proto::backplane::script::grpc::BindTableToVariableResponse BindTableToVariableResponse;
typedef io::deephaven::proto::backplane::script::grpc::ConsoleService ConsoleService;
typedef io::deephaven::proto::backplane::script::grpc::StartConsoleResponse StartConsoleResponse;
Expand Down Expand Up @@ -223,6 +224,10 @@ class Server : public std::enable_shared_from_this<Server> {
Ticket rightTableTicket, std::vector<std::string> columnsToMatch,
std::vector<std::string> columnsToAdd, std::shared_ptr<EtcCallback> etcCallback, Ticket result);

void updateByAsync(Ticket source, std::vector<UpdateByOperation> operations,
std::vector<std::string> groupByColumns,
std::shared_ptr<EtcCallback> etcCallback, Ticket result);

void bindToVariableAsync(const Ticket &consoleId, const Ticket &tableId, std::string variable,
std::shared_ptr<SFCallback<BindTableToVariableResponse>> callback);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
#pragma once

Expand All @@ -8,6 +8,7 @@
#include "deephaven/client/columns.h"
#include "deephaven/client/client_options.h"
#include "deephaven/client/expressions.h"
#include "deephaven/dhcore/clienttable/schema.h"
#include "deephaven/dhcore/ticking/ticking.h"
#include "deephaven/dhcore/utility/callbacks.h"

Expand Down Expand Up @@ -52,6 +53,7 @@ namespace deephaven::client {
class Client;
class TableHandle;
class TableHandleManager;
class UpdateByOperation;
namespace internal {
class TableHandleStreamAdaptor;
} // namespace internal
Expand Down Expand Up @@ -616,6 +618,7 @@ struct StringHolder {
* server resource is destructed, the resource will be released.
*/
class TableHandle {
typedef deephaven::dhcore::clienttable::Schema Schema;
typedef deephaven::dhcore::ticking::TickingCallback TickingCallback;
typedef deephaven::dhcore::ticking::TickingUpdate TickingUpdate;
typedef deephaven::client::BooleanExpression BooleanExpression;
Expand Down Expand Up @@ -1244,6 +1247,8 @@ class TableHandle {
TableHandle exactJoin(const TableHandle &rightSide, std::vector<MatchWithColumn> columnsToMatch,
std::vector<SelectColumn> columnsToAdd) const;

TableHandle updateBy(std::vector<UpdateByOperation> ops, std::vector<std::string> by) const;

/**
* Binds this table to a variable name in the QueryScope.
* @param variable The QueryScope variable to bind to.
Expand Down Expand Up @@ -1337,12 +1342,17 @@ class TableHandle {
/**
* Number of rows in the table at the time this TableHandle was created.
*/
int64_t numRows();
int64_t numRows() const;

/**
* Whether the table was static at the time this TableHandle was created.
*/
bool isStatic();
bool isStatic() const;

/**
* Returns the table's Schema.
*/
std::shared_ptr<Schema> schema() const;

/**
* Used internally. Returns the underlying impl object.
Expand Down
Loading

0 comments on commit decee54

Please sign in to comment.