Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #357 from EOSIO/subjective-bill-trx-hybrid-2.1
Browse files Browse the repository at this point in the history
Add decaying accumulator to subjective billing - 2.1.x
  • Loading branch information
heifner authored Mar 22, 2021
2 parents 1b5d32e + e1ad662 commit 26c5dfb
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 74 deletions.
65 changes: 65 additions & 0 deletions libraries/chain/include/eosio/chain/resource_limits_private.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,71 @@ namespace eosio { namespace chain { namespace resource_limits {
}
};

/**
* This class accumulates a value that decays over quantums based on inputs
* The decay is linear between updates and exponential if the set of inputs has no gaps
*
* The value stored is Precision times the sum of the inputs.
*/
template<uint64_t Precision = config::rate_limiting_precision>
struct exponential_decay_accumulator
{
static_assert( Precision > 0, "Precision must be positive" );
static constexpr uint64_t max_raw_value = std::numeric_limits<uint64_t>::max() / Precision;

exponential_decay_accumulator()
: last_ordinal(0)
, value_ex(0)
{
}

uint32_t last_ordinal; ///< The ordinal of the last period which has contributed to the accumulator
uint64_t value_ex; ///< The current accumulated value pre-multiplied by Precision

/**
* return the extended value at a current or future ordinal
*/
uint64_t value_ex_at( uint32_t ordinal, uint32_t window_size ) const {
if( last_ordinal < ordinal ) {
if( (uint64_t)last_ordinal + window_size > (uint64_t)ordinal ) {
const auto delta = ordinal - last_ordinal; // clearly 0 < delta < window_size
const auto decay = make_ratio(
(uint128_t)window_size - delta,
(uint128_t)window_size
);

return downgrade_cast<uint64_t>((uint128_t)value_ex * decay);
} else {
return 0;
}
} else {
return value_ex;
}
}

/**
* return the value at a current or future ordinal
*/
uint64_t value_at( uint32_t ordinal, uint32_t window_size ) const {
return integer_divide_ceil(value_ex_at(ordinal, window_size), Precision);
}

void add( uint64_t units, uint32_t ordinal, uint32_t window_size /* must be positive */ )
{
// check for some numerical limits before doing any state mutations
EOS_ASSERT(units <= max_raw_value, rate_limiting_state_inconsistent, "Usage exceeds maximum value representable after extending for precision");

uint128_t units_ex = (uint128_t)units * Precision;
if (last_ordinal < ordinal) {
value_ex = value_ex_at(ordinal, window_size);
last_ordinal = ordinal;
}

// saturate the value
uint128_t new_value_ex = std::min<uint128_t>(units_ex + (uint128_t)value_ex, std::numeric_limits<uint64_t>::max());
value_ex = downgrade_cast<uint64_t>(new_value_ex);
}
};
}

using usage_accumulator = impl::exponential_moving_average_accumulator<>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/block_state.hpp>
#include <eosio/chain/transaction.hpp>
#include <eosio/chain/resource_limits.hpp>
#include <eosio/chain/resource_limits_private.hpp>
#include <eosio/chain/config.hpp>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
Expand All @@ -17,6 +20,7 @@ using chain::transaction_id_type;
using chain::account_name;
using chain::block_state_ptr;
using chain::packed_transaction;
namespace config = chain::config;

class subjective_billing {
private:
Expand All @@ -38,43 +42,70 @@ class subjective_billing {
>
>;

using account_subjective_bill_cache = std::map<account_name, int32_t>;
using block_subjective_bill_cache = std::map<account_name, int32_t>;
using decaying_accumulator = chain::resource_limits::impl::exponential_decay_accumulator<>;

struct subjective_billing_info {
uint64_t pending_cpu_us; // tracked cpu us for transactions that may still succeed in a block
decaying_accumulator expired_accumulator; // accumulator used to account for transactions that have expired

bool empty(uint32_t time_ordinal) {
return pending_cpu_us == 0 && expired_accumulator.value_at(time_ordinal, expired_accumulator_average_window) == 0;
}
};

using account_subjective_bill_cache = std::map<account_name, subjective_billing_info>;
using block_subjective_bill_cache = std::map<account_name, uint64_t>;

bool _disabled = false;
trx_cache_index _trx_cache_index;
account_subjective_bill_cache _account_subjective_bill_cache;
block_subjective_bill_cache _block_subjective_bill_cache;

private:
void remove_subjective_billing( const trx_cache_entry& entry ) {
uint32_t time_ordinal_for( const fc::time_point& t ) const {
auto ordinal = t.time_since_epoch().count() / (1000U * (uint64_t)subjective_time_interval_ms);
EOS_ASSERT(ordinal <= std::numeric_limits<uint32_t>::max(), chain::tx_resource_exhaustion, "overflow of quantized time in subjective billing");
return ordinal;
}

void remove_subjective_billing( const trx_cache_entry& entry, uint32_t time_ordinal ) {
auto aitr = _account_subjective_bill_cache.find( entry.account );
if( aitr != _account_subjective_bill_cache.end() ) {
aitr->second -= entry.subjective_cpu_bill;
EOS_ASSERT( aitr->second >= 0, chain::tx_resource_exhaustion,
aitr->second.pending_cpu_us -= entry.subjective_cpu_bill;
EOS_ASSERT( aitr->second.pending_cpu_us >= 0, chain::tx_resource_exhaustion,
"Logic error in subjective account billing ${a}", ("a", entry.account) );
if( aitr->second == 0 ) _account_subjective_bill_cache.erase( aitr );
if( aitr->second.empty(time_ordinal) ) _account_subjective_bill_cache.erase( aitr );
}
}

void transition_to_expired( const trx_cache_entry& entry, uint32_t time_ordinal ) {
auto aitr = _account_subjective_bill_cache.find( entry.account );
if( aitr != _account_subjective_bill_cache.end() ) {
aitr->second.pending_cpu_us -= entry.subjective_cpu_bill;
aitr->second.expired_accumulator.add(entry.subjective_cpu_bill, time_ordinal, expired_accumulator_average_window);
}
}

void remove_subjective_billing( const block_state_ptr& bsp ) {
void remove_subjective_billing( const block_state_ptr& bsp, uint32_t time_ordinal ) {
if( !_trx_cache_index.empty() ) {
for( const auto& receipt : bsp->block->transactions ) {
if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
const auto& pt = std::get<packed_transaction>(receipt.trx);
remove_subjective_billing( pt.id() );
remove_subjective_billing( pt.id(), time_ordinal );
}
}
}
}

public: // public for tests
static constexpr uint32_t subjective_time_interval_ms = 5'000;
static constexpr uint32_t expired_accumulator_average_window = config::account_cpu_usage_average_window_ms / subjective_time_interval_ms;

void remove_subjective_billing( const transaction_id_type& trx_id ) {
void remove_subjective_billing( const transaction_id_type& trx_id, uint32_t time_ordinal ) {
auto& idx = _trx_cache_index.get<by_id>();
auto itr = idx.find( trx_id );
if( itr != idx.end() ) {
remove_subjective_billing( *itr );
remove_subjective_billing( *itr, time_ordinal );
idx.erase( itr );
}
}
Expand All @@ -94,44 +125,61 @@ class subjective_billing {
bill,
expire} );
if( p.second ) {
_account_subjective_bill_cache[first_auth] += bill;
_account_subjective_bill_cache[first_auth].pending_cpu_us += bill;
if( in_pending_block ) {
// if in_pending_block then we have billed user until block is aborted/applied
// keep track of this double bill amount so we can subtract it out in get_subjective_bill
_block_subjective_bill_cache[first_auth] += bill;
}
}
}
}

uint32_t get_subjective_bill( const account_name& first_auth ) const {
void subjective_bill_failure( const account_name& first_auth, const fc::microseconds& elapsed, const fc::time_point& now )
{
if( !_disabled ) {
uint32_t bill = std::max<int64_t>( 0, elapsed.count() );
const auto time_ordinal = time_ordinal_for(now);
_account_subjective_bill_cache[first_auth].expired_accumulator.add(bill, time_ordinal, expired_accumulator_average_window);
}
}

uint32_t get_subjective_bill( const account_name& first_auth, const fc::time_point& now ) const {
if( _disabled ) return 0;
int32_t sub_bill = 0;
const auto time_ordinal = time_ordinal_for(now);
const subjective_billing_info* sub_bill_info = nullptr;
auto aitr = _account_subjective_bill_cache.find( first_auth );
if( aitr != _account_subjective_bill_cache.end() ) {
sub_bill = aitr->second;
auto bitr = _block_subjective_bill_cache.find( first_auth );
if( bitr != _block_subjective_bill_cache.end() ) {
sub_bill -= bitr->second;
}
sub_bill_info = &aitr->second;
}
uint64_t in_block_pending_cpu_us = 0;
auto bitr = _block_subjective_bill_cache.find( first_auth );
if( bitr != _block_subjective_bill_cache.end() ) {
in_block_pending_cpu_us = bitr->second;
}

if (sub_bill_info) {
EOS_ASSERT(sub_bill_info->pending_cpu_us >= in_block_pending_cpu_us, chain::tx_resource_exhaustion, "Logic error subjective billing ${a}", ("a", first_auth) );
uint32_t sub_bill = sub_bill_info->pending_cpu_us - in_block_pending_cpu_us + sub_bill_info->expired_accumulator.value_at(time_ordinal, expired_accumulator_average_window );
return sub_bill;
} else {
return 0;
}
EOS_ASSERT( sub_bill >= 0, chain::tx_resource_exhaustion, "Logic error subjective billing ${a}", ("a", first_auth) );
return sub_bill;
}

void abort_block() {
_block_subjective_bill_cache.clear();
}

void on_block( const block_state_ptr& bsp ) {
void on_block( const block_state_ptr& bsp, const fc::time_point& now ) {
if( bsp == nullptr ) return;
remove_subjective_billing( bsp );
const auto time_ordinal = time_ordinal_for(now);
remove_subjective_billing( bsp, time_ordinal );
}

bool remove_expired( fc::logger& log, const fc::time_point& pending_block_time, const fc::time_point& deadline ) {
bool remove_expired( fc::logger& log, const fc::time_point& pending_block_time, const fc::time_point& now, const fc::time_point& deadline ) {
bool exhausted = false;
auto& idx = _trx_cache_index.get<by_expiry>();
if( !idx.empty() ) {
const auto time_ordinal = time_ordinal_for(now);
const auto orig_count = _trx_cache_index.size();
uint32_t num_expired = 0;

Expand All @@ -142,7 +190,7 @@ class subjective_billing {
}
auto b = idx.begin();
if( b->expiry > pending_block_time ) break;
remove_subjective_billing( *b );
transition_to_expired( *b, time_ordinal );
idx.erase( b );
num_expired++;
}
Expand Down
14 changes: 7 additions & 7 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
void on_block( const block_state_ptr& bsp ) {
auto before = _unapplied_transactions.size();
_unapplied_transactions.clear_applied( bsp );
_subjective_billing.on_block( bsp );
_subjective_billing.on_block( bsp, fc::time_point::now() );
fc_dlog( _log, "Removed applied transactions before: ${before}, after: ${after}",
("before", before)("after", _unapplied_transactions.size()) );
}
Expand Down Expand Up @@ -536,7 +536,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
auto first_auth = trx->packed_trx()->get_transaction().first_authorizer();
uint32_t sub_bill = 0;
if( _pending_block_mode != pending_block_mode::producing)
sub_bill = _subjective_billing.get_subjective_bill( first_auth );
sub_bill = _subjective_billing.get_subjective_bill( first_auth, fc::time_point::now() );

auto trace = chain.push_transaction( trx, deadline, trx->billed_cpu_time_us, false, sub_bill );
if( trace->except ) {
Expand All @@ -553,7 +553,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
exhausted = block_is_exhausted();
} else {
_subjective_billing.subjective_bill( trx->id(), expire, first_auth, trace->elapsed, false );
_subjective_billing.subjective_bill_failure( first_auth, trace->elapsed, fc::time_point::now() );
auto e_ptr = trace->except->dynamic_copy_exception();
send_response( e_ptr );
}
Expand Down Expand Up @@ -1586,7 +1586,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
return start_block_result::exhausted;
if( !remove_expired_blacklisted_trxs( preprocess_deadline ) )
return start_block_result::exhausted;
if( !_subjective_billing.remove_expired( _log, chain.pending_block_time(), preprocess_deadline ) )
if( !_subjective_billing.remove_expired( _log, chain.pending_block_time(), fc::time_point::now(), preprocess_deadline ) )
return start_block_result::exhausted;

// limit execution of pending incoming to once per block
Expand Down Expand Up @@ -1836,7 +1836,7 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin
("c", trace->except->code())("p", prev_billed_cpu_time_us)
("r", fc::time_point::now() - start)("id", trx->id()) );
account_fails.add( first_auth, failure_code );
_subjective_billing.subjective_bill( trx->id(), trx->packed_trx()->expiration(), first_auth, trace->elapsed, false );
_subjective_billing.subjective_bill_failure( first_auth, trace->elapsed, fc::time_point::now() );
}
++num_failed;
if( itr->next ) itr->next( trace );
Expand Down Expand Up @@ -2143,7 +2143,7 @@ bool producer_plugin_impl::maybe_produce_block() {
try {
produce_block();
return true;
}
}
catch(block_validation_error&) {
reason = "block vault rejected block, waiting on external block to continue";
}
Expand Down Expand Up @@ -2257,7 +2257,7 @@ void producer_plugin_impl::produce_block() {
_block_vault_resync.schedule();
EOS_ASSERT(false, block_validation_error, "Block rejected by block vault");
}

}

chain.commit_block();
Expand Down
Loading

0 comments on commit 26c5dfb

Please sign in to comment.