Skip to content

Commit

Permalink
pw_rpc: Rework C++ client; implement raw client
Browse files Browse the repository at this point in the history
- Rework the Client class to share code with the Server class.
- Replace BaseClientCall with a call object also derived from the
  internal::Call used for server calls.
- Implement a system for testing RPC client invocations.
- Implement the raw RPC client API and codegen.
- Reimplement the Nanopb RPC client API and codegen to share code.
- Implement Nanopb client & bidirectional streaming.
- Add an integration test that calls RPCs on a C++ server from a C++
  client.

Requires: pigweed-internal:16720
Change-Id: Id3a0fd31bb3b3259fb9386dae617e68b9bfe6985
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/65745
Reviewed-by: Ewout van Bekkum <[email protected]>
Commit-Queue: Wyatt Hepler <[email protected]>
  • Loading branch information
255 authored and CQ Bot Account committed Oct 19, 2021
1 parent 75ab80a commit 82db4b1
Show file tree
Hide file tree
Showing 83 changed files with 3,248 additions and 1,892 deletions.
1 change: 1 addition & 0 deletions BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ group("integration_tests") {
if (pw_RUN_INTEGRATION_TESTS) {
_default_tc = _default_toolchain_prefix + pw_default_optimization_level
deps = [
"$dir_pw_rpc:cpp_client_server_integration_test($_default_tc)",
"$dir_pw_rpc/py:python_client_cpp_server_test.action($_default_tc)",
"$dir_pw_transfer/py:python_cpp_transfer_test.action($_default_tc)",
"$dir_pw_unit_test/py:rpc_service_test.action($_default_tc)",
Expand Down
1 change: 0 additions & 1 deletion pw_file/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pw_source_set("flat_file_system") {
public_deps = [
":proto.pwpb",
":proto.raw_rpc",
"$dir_pw_rpc/raw:method",
dir_pw_assert,
dir_pw_bytes,
dir_pw_log,
Expand Down
4 changes: 2 additions & 2 deletions pw_hdlc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pw_cc_library(
includes = ["public"],
deps = [
":pw_hdlc",
"//pw_rpc:server",
"//pw_rpc",
],
)

Expand All @@ -63,7 +63,7 @@ pw_cc_library(
includes = ["public"],
deps = [
":pw_hdlc",
"//pw_rpc:server",
"//pw_rpc",
],
)

Expand Down
2 changes: 1 addition & 1 deletion pw_hdlc/rpc_example/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ pw_cc_library(
"//pw_hdlc",
"//pw_hdlc:pw_rpc",
"//pw_log",
"//pw_rpc:server",
"//pw_rpc",
],
)
14 changes: 8 additions & 6 deletions pw_log_rpc/log_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {

LogService log_service(drain_map_);
const uint32_t output_buffer_size = 100;
rpc::RawFakeChannelOutput<output_buffer_size, 10, 512> output;
rpc::RawFakeChannelOutput<10, output_buffer_size, 512> output;
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
rpc::Server server(std::span(&channel, 1));

Expand All @@ -438,7 +438,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
EXPECT_TRUE(output.done());

// Make sure not all packets were sent.
ASSERT_EQ(output.total_stream_packets(), successful_packets_sent);
ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);

// Verify data in responses.
Vector<ConstByteSpan, total_entries> message_stack;
Expand Down Expand Up @@ -491,7 +491,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {

LogService log_service(drain_map_);
const uint32_t output_buffer_size = 100;
rpc::RawFakeChannelOutput<output_buffer_size, 10, 768> output;
rpc::RawFakeChannelOutput<10, output_buffer_size, 768> output;
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
rpc::Server server(std::span(&channel, 1));

Expand All @@ -515,7 +515,7 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
EXPECT_FALSE(output.done());

// Make some packets were sent.
ASSERT_GE(output.total_stream_packets(), min_packets_sent);
ASSERT_GE(output.payloads<Logs::Listen>().size(), min_packets_sent);

// Verify that not all the entries were sent.
size_t entries_found = 0;
Expand Down Expand Up @@ -554,11 +554,13 @@ TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
}

// More calls to flush with errors will not affect this stubborn drain.
const size_t previous_stream_packet_count = output.total_stream_packets();
const size_t previous_stream_packet_count =
output.payloads<Logs::Listen>().size();
output.set_send_status(Status::Unavailable());
EXPECT_EQ(drain.value()->Flush(), OkStatus());
EXPECT_FALSE(output.done());
ASSERT_EQ(output.total_stream_packets(), previous_stream_packet_count);
ASSERT_EQ(output.payloads<Logs::Listen>().size(),
previous_stream_packet_count);

output.clear();
EXPECT_EQ(drain.value()->Close(), OkStatus());
Expand Down
4 changes: 2 additions & 2 deletions pw_log_rpc/rpc_log_drain_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ TEST(RpcLogDrain, FlushingDrainWithOpenWriter) {
RpcLogDrainMap drain_map(drains);
LogService log_service(drain_map);

rpc::RawFakeChannelOutput<128, 3> output;
rpc::RawFakeChannelOutput<3, 128> output;
rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
rpc::Server server(std::span(&channel, 1));

Expand Down Expand Up @@ -144,7 +144,7 @@ TEST(RpcLogDrain, TryReopenOpenedDrain) {
RpcLogDrainMap drain_map(drains);
LogService log_service(drain_map);

rpc::RawFakeChannelOutput<128, 1> output;
rpc::RawFakeChannelOutput<1, 128> output;
rpc::Channel channel(rpc::Channel::Create<drain_id>(&output));
rpc::Server server(std::span(&channel, 1));

Expand Down
8 changes: 4 additions & 4 deletions pw_protobuf/py/pw_protobuf/proto_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,12 @@ def type(self) -> Type:
return self._type

def server_streaming(self) -> bool:
return (self._type is self.Type.SERVER_STREAMING
or self._type is self.Type.BIDIRECTIONAL_STREAMING)
return self._type in (self.Type.SERVER_STREAMING,
self.Type.BIDIRECTIONAL_STREAMING)

def client_streaming(self) -> bool:
return (self._type is self.Type.CLIENT_STREAMING
or self._type is self.Type.BIDIRECTIONAL_STREAMING)
return self._type in (self.Type.CLIENT_STREAMING,
self.Type.BIDIRECTIONAL_STREAMING)

def request_type(self) -> ProtoNode:
return self._request_type
Expand Down
7 changes: 4 additions & 3 deletions pw_protobuf_compiler/proto.gni
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ template("_pw_nanopb_rpc_proto_library") {
public_deps = [
":${invoker.base_target}.nanopb",
"$dir_pw_rpc:server",
"$dir_pw_rpc/nanopb:client",
"$dir_pw_rpc/nanopb:method_union",
"$dir_pw_rpc/nanopb:client_api",
"$dir_pw_rpc/nanopb:server_api",
"$dir_pw_third_party/nanopb",
] + invoker.deps
public = invoker.outputs
Expand Down Expand Up @@ -264,7 +264,8 @@ template("_pw_raw_rpc_proto_library") {
deps = [ ":$target_name._gen($pw_protobuf_compiler_TOOLCHAIN)" ]
public_deps = [
"$dir_pw_rpc:server",
"$dir_pw_rpc/raw:method_union",
"$dir_pw_rpc/raw:client_api",
"$dir_pw_rpc/raw:server_api",
] + invoker.deps
public = invoker.outputs
check_includes = false
Expand Down
Loading

0 comments on commit 82db4b1

Please sign in to comment.