Skip to content

Commit

Permalink
Quick integration with gRPC using protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
fergeben committed Apr 12, 2020
1 parent 0846677 commit 581ad01
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 340 deletions.
30 changes: 23 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ enable_testing()
set(MAIN_FILE src/main.cpp)
set(SRC_FILES
src/server.cpp
src/service_handler.cpp
src/response.cpp
src/msg/response.cpp
src/action_handler.cpp
src/db/entities/stream.cpp
src/db/entities/event.cpp
Expand All @@ -27,11 +26,12 @@ set(SRC_FILES
src/msg/serializer.cpp
inc_duktape/duktape.c
src/ext/duk_context.cpp
protos/yess.grpc.pb.cc
protos/yess.pb.cc
)
set(INC_FILES
src/server.hpp
src/service_handler.hpp
src/response.hpp
src/msg/response.hpp
src/action_handler.hpp
src/db/entities/stream.hpp
src/db/entities/event.hpp
Expand All @@ -41,18 +41,34 @@ set(INC_FILES
src/ext/context.hpp
src/ext/duk_context.hpp
src/log.hpp
src/grpc_server.hpp
protos/yess.grpc.pb.h
protos/yess.pb.h
)
add_library(${PROJECT_NAME}_objects OBJECT ${SRC_FILES} ${INCL_FILES})
add_subdirectory(utils)
add_subdirectory(debug)

target_include_directories(${PROJECT_NAME}_objects PRIVATE "${CMAKE_BINARY_DIR}/include" inc_duktape)
target_include_directories(${PROJECT_NAME}_objects PRIVATE "${CMAKE_BINARY_DIR}/include" inc_duktape protos)
add_executable(${PROJECT_NAME} ${MAIN_FILE} $<TARGET_OBJECTS:${PROJECT_NAME}_objects>)
target_include_directories(${PROJECT_NAME} PRIVATE inc_duktape)
target_include_directories(${PROJECT_NAME} PRIVATE inc_duktape protos)

add_subdirectory(test)

target_link_libraries(${PROJECT_NAME} PRIVATE ${CONAN_LIBS} yessutils)
find_package(Threads REQUIRED)

find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${protobuf_VERSION}")

find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")


target_link_libraries(${PROJECT_NAME} PRIVATE ${CONAN_LIBS} yessutils
gRPC::grpc++
gRPC::grpc++_reflection
protobuf::libprotobuf
)

add_custom_target(
run
Expand Down
2 changes: 2 additions & 0 deletions protos/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.pb.h
*.pb.cc
1 change: 1 addition & 0 deletions protos/compile.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
protoc -I. --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin yess.proto
37 changes: 37 additions & 0 deletions protos/yess.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
syntax = "proto3";

package yess;

message Event {
int32 id = 1;
int32 stream_id = 2;
string type = 3;
string payload = 4;
int32 version = 5;
}

message Stream {
int32 id = 1;
string type = 2;
int32 version = 3;
repeated Event events = 4;
}

message Response {
int32 status = 1;
string msg = 2;
}

message CreateStreamReq {
string type = 1;
}

message PushEventReq {
int32 stream_id = 1;
Event event = 2;
}

service YessService {
rpc CreateStream (CreateStreamReq) returns (Response);
rpc PushEvent (PushEventReq) returns (Response);
}
19 changes: 4 additions & 15 deletions src/action_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
#include "db/repositories/sqlite_stream_repo.hpp"
#include "db/repositories/stream_repository.hpp"
#include "log.hpp"
#include "response.hpp"
#include "server.hpp"
#include "msg/response.hpp"

using json = nlohmann::json;
using namespace yess::msg;

namespace yess
{
Expand Down Expand Up @@ -72,20 +72,9 @@ void Action_handler::save_stream(const yess::db::Stream &stream)
stream_repo_->create(stream);
}

Action_handler::Action_handler()
Action_handler::Action_handler(std::string conn_str)
{
Application &app = Server::instance();
Server *yess = static_cast<Server *>(&app);
auto connStr = yess->conn_str();
// TODO: DRY
std::string conn_key = (std::string)app.config().getString(
"EventStore.ConnectorKey", "SQLite");
std::string conn_str = (std::string)app.config().getString(
"EventStore.ConnectionString", "yess.db");
if (!connStr.empty()) {
conn_str = connStr;
}
auto stream_repo = db::Sqlite_stream_repo(conn_key, conn_str);
auto stream_repo = db::Sqlite_stream_repo("SQLite", conn_str);
// Repository initialization in the Server ctor.
stream_repo_ =
std::make_unique<db::Sqlite_stream_repo>(std::move(stream_repo));
Expand Down
8 changes: 3 additions & 5 deletions src/action_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
#include <memory>
#include <string>

#include "msg/response.hpp"
#include "nlohmann/json_fwd.hpp"
#include "response.hpp"
#include "server.hpp"

#include "db/entities/stream.hpp"
#include "db/repositories/stream_repository.hpp"
Expand All @@ -30,13 +29,12 @@ Action action_from_str(std::string action);
class Action_handler
{
public:
Action_handler();
Action_handler(std::string conn_str);
~Action_handler();
Response handle(const json &obj);
msg::Response handle(const json &obj);
void save_stream(const db::Stream &stream);

private:
Server *server_;
std::unique_ptr<db::Stream_repository> stream_repo_;
};
} // namespace yess
90 changes: 90 additions & 0 deletions src/grpc_service.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#pragma once

#include <iostream>
#include <memory>
#include <string>

#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>

#include "../protos/yess.grpc.pb.h"
#include "action_handler.hpp"
#include "msg/response.hpp"
#include "nlohmann/json.hpp"
#include "yess.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using yess::CreateStreamReq;
using yess::PushEventReq;
using yess::Response;
using yess::YessService;
using json = nlohmann::json;

namespace yess
{
class YessServiceImpl : public YessService::Service
{
public:
YessServiceImpl(std::string conn_str)
: handler_(std::make_unique<yess::Action_handler>(conn_str))
{
}

private:
Status CreateStream(ServerContext *context,
const CreateStreamReq *request,
Response *reply) override
{
json cmd = {{"action", "CreateStream"}, {"type", request->type()}};

msg::Response resp = handler_->handle(cmd);
reply->set_msg(resp.getMessage());
reply->set_status((int)resp.getStatus());
return Status::OK;
}

Status PushEvent(ServerContext *context,
const PushEventReq *request,
Response *reply) override
{

json cmd = {{"action", "PushEvent"},
{"streamId", request->stream_id()},
{"type", request->event().type()},
{"payload", request->event().payload()},
{"version", request->event().version()}};

msg::Response resp = handler_->handle(cmd);
reply->set_msg(resp.getMessage());
reply->set_status((int)resp.getStatus());
return Status::OK;
}
std::unique_ptr<yess::Action_handler> handler_;
};

void run_server()
{
std::string server_address("0.0.0.0:2929");
YessServiceImpl service("yess.db");

grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;

// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
} // namespace yess
8 changes: 5 additions & 3 deletions src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "server.hpp"
#include "grpc_service.hpp"

int main(int argc, char **argv)
{
yess::Server server;
return server.run(argc, argv);
yess::run_server();

return 0;
}

4 changes: 2 additions & 2 deletions src/response.cpp → src/msg/response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "response.hpp"

namespace yess
namespace yess::msg
{
Response::Response(ResponseStatus status, std::string message)
: m_status(status), m_message(message)
Expand All @@ -14,4 +14,4 @@ bool Response::operator==(const Response &resp) const
{
return m_message == resp.m_message && m_status == resp.m_status;
}
} // namespace yess
} // namespace yess::msg
4 changes: 2 additions & 2 deletions src/response.hpp → src/msg/response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <string>

namespace yess
namespace yess::msg
{
enum class ResponseStatus { OK, Error };
class Response
Expand All @@ -17,4 +17,4 @@ class Response
ResponseStatus m_status;
std::string m_message;
};
} // namespace yess
} // namespace yess::msg
Loading

0 comments on commit 581ad01

Please sign in to comment.