Skip to content

Commit

Permalink
state history plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
tbfleming committed Oct 18, 2018
1 parent 8d9f326 commit 1db9370
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ class history_log {
return log;
}

chain::block_id_type get_block_id(uint32_t block_num) {
EOS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception,
"read non-existing block in ${name}.log", ("name", name));
history_summary summary;
index.seekg((block_num - _begin_block) * sizeof(summary));
index.read((char*)&summary, sizeof(summary));
return summary.block_id;
}

private:
bool get_last_block(uint64_t size) {
history_log_header header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,52 @@ struct table_delta {
std::vector<std::pair<bool, bytes>> rows{};
};

struct block_position {
uint32_t block_num = 0;
chain::block_id_type block_id = {};
};

struct get_status_request_v0 {};

struct get_status_result_v0 {
uint32_t head_block_num = 0;
chain::block_id_type head_block_id = {};
uint32_t last_irreversible_block_num = 0;
chain::block_id_type last_irreversible_block_id = {};
uint32_t block_state_begin_block = 0;
uint32_t block_state_end_block = 0;
uint32_t trace_begin_block = 0;
uint32_t trace_end_block = 0;
uint32_t chain_state_begin_block = 0;
uint32_t chain_state_end_block = 0;
block_position head = {};
block_position last_irreversible = {};
uint32_t block_state_begin_block = 0;
uint32_t block_state_end_block = 0;
uint32_t trace_begin_block = 0;
uint32_t trace_end_block = 0;
uint32_t chain_state_begin_block = 0;
uint32_t chain_state_end_block = 0;
};

struct get_blocks_request_v0 {
uint32_t start_block_num = 0;
uint32_t end_block_num = 0;
uint32_t max_messages_in_flight = 0;
std::vector<block_position> have_positions = {};
bool irreversible_only = false;
bool fetch_block = false;
bool fetch_block_state = false;
bool fetch_traces = false;
bool fetch_deltas = false;
};

struct get_block_request_v0 {
uint32_t block_num = 0;
bool fetch_block = false;
bool fetch_block_state = false;
bool fetch_traces = false;
bool fetch_deltas = false;
struct get_blocks_ack_request_v0 {
uint32_t num_messages = 0;
};

struct get_block_result_v0 {
uint32_t block_num = 0;
fc::optional<bytes> block;
fc::optional<bytes> block_state;
fc::optional<bytes> traces;
fc::optional<bytes> deltas;
struct get_blocks_result_v0 {
block_position head;
block_position last_irreversible;
fc::optional<block_position> this_block;
fc::optional<bytes> block;
fc::optional<bytes> block_state;
fc::optional<bytes> traces;
fc::optional<bytes> deltas;
};

using state_request = fc::static_variant<get_status_request_v0, get_block_request_v0>;
using state_result = fc::static_variant<get_status_result_v0, get_block_result_v0>;
using state_request = fc::static_variant<get_status_request_v0, get_blocks_request_v0, get_blocks_ack_request_v0>;
using state_result = fc::static_variant<get_status_result_v0, get_blocks_result_v0>;

class state_history_plugin : public plugin<state_history_plugin> {
public:
Expand All @@ -77,10 +90,13 @@ class state_history_plugin : public plugin<state_history_plugin> {
} // namespace eosio

FC_REFLECT(eosio::table_delta, (struct_version)(name)(rows));
FC_REFLECT(eosio::block_position, (block_num)(block_id));
FC_REFLECT_EMPTY(eosio::get_status_request_v0);
FC_REFLECT(
eosio::get_status_result_v0,
(head_block_num)(head_block_id)(last_irreversible_block_num)(last_irreversible_block_id)(block_state_begin_block)(
block_state_end_block)(trace_begin_block)(trace_end_block)(chain_state_begin_block)(chain_state_end_block));
FC_REFLECT(eosio::get_block_request_v0, (block_num)(fetch_block)(fetch_block_state)(fetch_traces)(fetch_deltas));
FC_REFLECT(eosio::get_block_result_v0, (block_num)(block)(block_state)(traces)(deltas));
FC_REFLECT(eosio::get_status_result_v0,
(head)(last_irreversible)(block_state_begin_block)(block_state_end_block)(trace_begin_block)(
trace_end_block)(chain_state_begin_block)(chain_state_end_block));
FC_REFLECT(eosio::get_blocks_request_v0,
(start_block_num)(end_block_num)(max_messages_in_flight)(have_positions)(irreversible_only)(fetch_block)(
fetch_block_state)(fetch_traces)(fetch_deltas));
FC_REFLECT(eosio::get_blocks_ack_request_v0, (num_messages));
FC_REFLECT(eosio::get_blocks_result_v0, (head)(last_irreversible)(this_block)(block)(block_state)(traces)(deltas));
115 changes: 98 additions & 17 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
bool stopping = false;
fc::optional<scoped_connection> applied_transaction_connection;
fc::optional<scoped_connection> accepted_block_connection;
fc::optional<scoped_connection> irreversible_block_connection;
string endpoint_address = "0.0.0.0";
uint16_t endpoint_port = 4321;
std::unique_ptr<tcp::acceptor> acceptor;
Expand All @@ -68,16 +69,39 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
}

void get_block(uint32_t block_num, fc::optional<bytes>& result) {
auto p = chain_plug->chain().fetch_block_by_number(block_num);
chain::signed_block_ptr p;
try {
p = chain_plug->chain().fetch_block_by_number(block_num);
} catch (...) {
return;
}
result = fc::raw::pack(*p);
}

fc::optional<chain::block_id_type> get_block_id(uint32_t block_num) {
if (block_state_log && block_num >= block_state_log->begin_block() && block_num < block_state_log->end_block())
return block_state_log->get_block_id(block_num);
if (trace_log && block_num >= trace_log->begin_block() && block_num < trace_log->end_block())
return trace_log->get_block_id(block_num);
if (chain_state_log && block_num >= chain_state_log->begin_block() && block_num < chain_state_log->end_block())
return chain_state_log->get_block_id(block_num);
try {
auto block = chain_plug->chain().fetch_block_by_number(block_num);
if (block)
return block->id();
} catch (...) {
}
return {};
}

struct session : std::enable_shared_from_this<session> {
std::shared_ptr<state_history_plugin_impl> plugin;
std::unique_ptr<ws::stream<tcp::socket>> stream;
bool sending = false;
bool sentAbi = false;
std::vector<std::vector<char>> send_queue;
fc::optional<get_blocks_request_v0> current_request;
bool need_to_send_update = false;

session(std::shared_ptr<state_history_plugin_impl> plugin)
: plugin(std::move(plugin)) {}
Expand Down Expand Up @@ -129,8 +153,10 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
}

void send() {
if (sending || send_queue.empty())
if (sending)
return;
if (send_queue.empty())
return send_get_blocks();
sending = true;
stream->binary(sentAbi);
sentAbi = true;
Expand All @@ -151,10 +177,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
void operator()(get_status_request_v0&) {
auto& chain = plugin->chain_plug->chain();
get_status_result_v0 result;
result.head_block_num = chain.head_block_num();
result.head_block_id = chain.head_block_id();
result.last_irreversible_block_num = chain.last_irreversible_block_num();
result.last_irreversible_block_id = chain.last_irreversible_block_id();
result.head = {chain.head_block_num(), chain.head_block_id()};
result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
if (plugin->block_state_log) {
result.block_state_begin_block = plugin->block_state_log->begin_block();
result.block_state_end_block = plugin->block_state_log->end_block();
Expand All @@ -170,18 +194,58 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
send(std::move(result));
}

void operator()(get_block_request_v0& req) {
// ilog("${b} get_block_request_v0", ("b", req.block_num));
get_block_result_v0 result{req.block_num};
if (req.fetch_block)
plugin->get_block(req.block_num, result.block);
if (req.fetch_block_state && plugin->block_state_log)
plugin->get_data(*plugin->block_state_log, req.block_num, result.block_state);
if (req.fetch_traces && plugin->trace_log)
plugin->get_data(*plugin->trace_log, req.block_num, result.traces);
if (req.fetch_deltas && plugin->chain_state_log)
plugin->get_data(*plugin->chain_state_log, req.block_num, result.deltas);
void operator()(get_blocks_request_v0& req) {
for (auto& cp : req.have_positions) {
if (req.start_block_num <= cp.block_num)
continue;
auto id = plugin->get_block_id(cp.block_num);
if (!id || *id != cp.block_id)
req.start_block_num = cp.block_num;
}
req.have_positions.clear();
current_request = req;
send_get_blocks(true);
}

void operator()(get_blocks_ack_request_v0& req) {
if (!current_request)
return;
current_request->max_messages_in_flight += req.num_messages;
send_get_blocks();
}

void send_get_blocks(bool changed = false) {
if (changed)
need_to_send_update = true;
if (!send_queue.empty() || !need_to_send_update || !current_request ||
!current_request->max_messages_in_flight)
return;
auto& chain = plugin->chain_plug->chain();
get_blocks_result_v0 result;
result.head = {chain.head_block_num(), chain.head_block_id()};
result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()};
uint32_t current =
current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num;
if (current_request->start_block_num <= current &&
current_request->start_block_num < current_request->end_block_num) {
auto block_id = plugin->get_block_id(current_request->start_block_num);
if (block_id) {
result.this_block = block_position{current_request->start_block_num, *block_id};
if (current_request->fetch_block)
plugin->get_block(current_request->start_block_num, result.block);
if (current_request->fetch_block_state && plugin->block_state_log)
plugin->get_data(*plugin->block_state_log, current_request->start_block_num, result.block_state);
if (current_request->fetch_traces && plugin->trace_log)
plugin->get_data(*plugin->trace_log, current_request->start_block_num, result.traces);
if (current_request->fetch_deltas && plugin->chain_state_log)
plugin->get_data(*plugin->chain_state_log, current_request->start_block_num, result.deltas);
}
++current_request->start_block_num;
}
send(std::move(result));
--current_request->max_messages_in_flight;
need_to_send_update = current_request->start_block_num <= current &&
current_request->start_block_num < current_request->end_block_num;
}

template <typename F>
Expand Down Expand Up @@ -292,6 +356,20 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
store_block_state(block_state);
store_traces(block_state);
store_chain_state(block_state);
for (auto& s : sessions) {
auto& p = s.second;
if (p) {
if (p->current_request && block_state->block_num < p->current_request->start_block_num)
p->current_request->start_block_num = block_state->block_num;
p->send_get_blocks(true);
}
}
}

void on_irreversible_block() {
// todo: get irreversible from block_state in on_accepted_block instead
for (auto& s : sessions)
s.second->send_get_blocks(true);
}

void store_block_state(const block_state_ptr& block_state) {
Expand Down Expand Up @@ -458,6 +536,8 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
chain.applied_transaction.connect([&](const transaction_trace_ptr& p) { my->on_applied_transaction(p); }));
my->accepted_block_connection.emplace(
chain.accepted_block.connect([&](const block_state_ptr& p) { my->on_accepted_block(p); }));
my->irreversible_block_connection.emplace(
chain.irreversible_block.connect([&](const block_state_ptr&) { my->on_irreversible_block(); }));

auto dir_option = options.at("state-history-dir").as<bfs::path>();
boost::filesystem::path state_history_dir;
Expand Down Expand Up @@ -497,6 +577,7 @@ void state_history_plugin::plugin_startup() { my->listen(); }
void state_history_plugin::plugin_shutdown() {
my->applied_transaction_connection.reset();
my->accepted_block_connection.reset();
my->irreversible_block_connection.reset();
while (!my->sessions.empty())
my->sessions.begin()->second->close();
my->stopping = true;
Expand Down
35 changes: 25 additions & 10 deletions plugins/state_history_plugin/state_history_plugin_abi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ extern const char* const state_history_plugin_abi = R"({
{
"name": "get_status_request_v0", "fields": []
},
{
"name": "block_position", "fields": [
{ "name": "block_num", "type": "uint32" },
{ "name": "block_id", "type": "checksum256" }
]
},
{
"name": "get_status_result_v0", "fields": [
{ "name": "head_block_num", "type": "uint32" },
{ "name": "head_block_id", "type": "checksum256" },
{ "name": "last_irreversible_block_num", "type": "uint32" },
{ "name": "last_irreversible_block_id", "type": "checksum256" },
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "block_state_begin_block", "type": "uint32" },
{ "name": "block_state_end_block", "type": "uint32" },
{ "name": "trace_begin_block", "type": "uint32" },
Expand All @@ -19,17 +23,28 @@ extern const char* const state_history_plugin_abi = R"({
]
},
{
"name": "get_block_request_v0", "fields": [
{ "name": "block_num", "type": "uint32" },
"name": "get_blocks_request_v0", "fields": [
{ "name": "start_block_num", "type": "uint32" },
{ "name": "end_block_num", "type": "uint32" },
{ "name": "max_messages_in_flight", "type": "uint32" },
{ "name": "have_positions", "type": "block_position[]" },
{ "name": "irreversible_only", "type": "bool" },
{ "name": "fetch_block", "type": "bool" },
{ "name": "fetch_block_state", "type": "bool" },
{ "name": "fetch_traces", "type": "bool" },
{ "name": "fetch_deltas", "type": "bool" }
]
},
{
"name": "get_block_result_v0", "fields": [
{ "name": "block_num", "type": "uint32" },
"name": "get_blocks_ack_request_v0", "fields": [
{ "name": "num_messages", "type": "uint32" }
]
},
{
"name": "get_blocks_result_v0", "fields": [
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
{ "name": "block", "type": "bytes?" },
{ "name": "block_state", "type": "bytes?" },
{ "name": "traces", "type": "bytes?" },
Expand Down Expand Up @@ -429,8 +444,8 @@ extern const char* const state_history_plugin_abi = R"({
{ "new_type_name": "transaction_id", "type": "checksum256" }
],
"variants": [
{ "name": "request", "types": ["get_status_request_v0", "get_block_request_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_block_result_v0"] },
{ "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_ack_request_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0"] },
{ "name": "action_receipt", "types": ["action_receipt_v0"] },
{ "name": "action_trace", "types": ["action_trace_v0"] },
Expand Down

0 comments on commit 1db9370

Please sign in to comment.