Skip to content

Commit

Permalink
Implement asset stats tracking with delta updates
Browse files Browse the repository at this point in the history
  • Loading branch information
shesek committed Jun 29, 2019
1 parent cec44c7 commit 8f3a393
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 17 deletions.
4 changes: 4 additions & 0 deletions doc/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ If the `blockhash` was since orphaned, the cache is removed and re-computed.
* `"A{scripthash}" → "{stats}{blockhash}"` (where `stats` is composed of `tx_count`, `funded_txo_{count,sum}` and `spent_txo_{count,sum}`)

* `"U{scripthash}" → "{utxo}{blockhash}"` (where `utxo` is a set of `(txid,vout)` outpoints)

Elements only:

* `"z{asset-id}" → "{stats}{blockhash}"` (where `stats` is composed of `tx_count`, `total_issuances`, `issued_amount`, `issued_amount_known`)
144 changes: 137 additions & 7 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use bitcoin::consensus::encode::{deserialize, serialize};
use bitcoin_hashes::{hex::FromHex, sha256, sha256d, Hash};
use elements::confidential::Asset;
use elements::confidential::{Asset, Value};
use elements::{AssetIssuance, OutPoint, Transaction, TxIn, TxOut};

use crate::errors::*;
use crate::new_index::schema::{
FundingInfo, SpendingInfo, TxHistoryInfo, TxHistoryKey, TxHistoryRow,
};
use crate::new_index::{parse_hash, DBRow, DB};
use crate::new_index::{db::DBFlush, parse_hash, ChainQuery, DBRow};
use crate::util::{full_hash, has_prevout, is_spendable, Bytes, FullHash, TxInput};

use crate::elements::{
Expand All @@ -26,14 +26,15 @@ lazy_static! {
.unwrap();
}

// Internal representation
#[derive(Serialize)]
pub struct AssetEntry {
pub asset_id: sha256d::Hash,
pub issuance_txin: TxInput,
pub issuance_prevout: OutPoint,
pub contract_hash: sha256d::Hash,

pub chain_stats: AssetStats,

// optional metadata from registry
#[serde(flatten)]
pub meta: Option<AssetMeta>,
Expand All @@ -50,7 +51,12 @@ pub struct AssetRowValue {
}

impl AssetEntry {
pub fn new(asset_hash: &[u8], asset: AssetRowValue, meta: Option<AssetMeta>) -> Self {
pub fn new(
asset_hash: &[u8],
asset: AssetRowValue,
chain_stats: AssetStats,
meta: Option<AssetMeta>,
) -> Self {
let issuance: AssetIssuance =
deserialize(&asset.issuance).expect("failed parsing AssetIssuance");

Expand All @@ -69,6 +75,7 @@ impl AssetEntry {
vout: asset.prev_vout as u32,
},
contract_hash,
chain_stats,
meta,
}
}
Expand All @@ -79,6 +86,8 @@ pub struct IssuanceInfo {
pub txid: FullHash,
pub vin: u16,
pub is_reissuance: bool,
// None for blinded issuances
pub issued_amount: Option<u64>,
}

// TODO: index mempool transactions
Expand Down Expand Up @@ -140,6 +149,11 @@ pub fn index_elements_transaction(
let asset_hash = asset_id.into_inner().into_inner();
let asset = Asset::Explicit(sha256d::Hash::from_inner(asset_hash.clone()));

let issued_amount = match txi.asset_issuance.amount {
Value::Explicit(amount) => Some(amount),
_ => None,
};

// the initial issuance is kept twice: once in the history index under I<asset><height><txid:vin>,
// and once separately under i<asset> for asset lookup with some more associated metadata.
// reissuances are only kept under the history index.
Expand All @@ -151,6 +165,7 @@ pub fn index_elements_transaction(
txid,
vin: txi_index as u16,
is_reissuance,
issued_amount,
}),
);
rows.push(history.to_row());
Expand Down Expand Up @@ -196,15 +211,18 @@ fn asset_history_row(asset: &Asset, confirmed_height: u32, txinfo: TxHistoryInfo
}

pub fn lookup_asset(
history_db: &DB,
chain: &ChainQuery,
registry: Option<&AssetRegistry>,
asset_hash: &[u8],
) -> Result<Option<AssetEntry>> {
let history_db = chain.store().history_db();

if let Some(row) = history_db.get(&[b"i", &asset_hash[..]].concat()) {
let row = bincode::deserialize(&row).expect("failed to parse AssetRowValue");
let asset_id = sha256d::Hash::from_slice(asset_hash).chain_err(|| "invalid asset hash")?;
let meta = registry.map_or_else(|| Ok(None), |r| r.load(asset_id))?;
Ok(Some(AssetEntry::new(asset_hash, row, meta)))
let chain_stats = asset_stats(chain, asset_hash);
Ok(Some(AssetEntry::new(asset_hash, row, chain_stats, meta)))
} else {
Ok(None)
}
Expand All @@ -228,3 +246,115 @@ pub fn get_issuance_assetid(txin: &TxIn) -> Result<AssetId> {

Ok(AssetId::from_entropy(entropy))
}

// Asset stats

#[derive(Serialize, Deserialize, Debug)]
pub struct AssetStats {
pub tx_count: usize,
pub issuance_count: usize,
pub issued_amount: u64,
// false if there was at least one blinded (re)issuance
pub issued_amount_known: bool,
}

impl AssetStats {
fn default() -> Self {
Self {
tx_count: 0,
issuance_count: 0,
issued_amount: 0,
issued_amount_known: true,
}
}
}

fn asset_cache_key(asset_hash: &[u8]) -> Bytes {
[b"z", asset_hash].concat()
}
fn asset_cache_row(asset_hash: &[u8], stats: &AssetStats, blockhash: &sha256d::Hash) -> DBRow {
DBRow {
key: asset_cache_key(asset_hash),
value: bincode::serialize(&(stats, blockhash)).unwrap(),
}
}

pub fn asset_stats(chain: &ChainQuery, asset_hash: &[u8]) -> AssetStats {
// get the last known stats and the blockhash they are updated for.
// invalidates the cache if the block was orphaned.
let cache: Option<(AssetStats, usize)> = chain
.store()
.cache_db()
.get(&asset_cache_key(asset_hash))
.map(|c| bincode::deserialize(&c).unwrap())
.and_then(|(stats, blockhash)| {
chain
.height_by_hash(&blockhash)
.map(|height| (stats, height))
});

// update stats with new transactions since
let (newstats, lastblock) = cache.map_or_else(
|| asset_stats_delta(chain, asset_hash, AssetStats::default(), 0),
|(oldstats, blockheight)| asset_stats_delta(chain, asset_hash, oldstats, blockheight + 1),
);

// save updated stats to cache
if let Some(lastblock) = lastblock {
chain.store().cache_db().write(
vec![asset_cache_row(asset_hash, &newstats, &lastblock)],
DBFlush::Enable,
);
}

newstats
}

fn asset_stats_delta(
chain: &ChainQuery,
asset_hash: &[u8],
init_stats: AssetStats,
start_height: usize,
) -> (AssetStats, Option<sha256d::Hash>) {
let history_iter = chain
.history_iter_scan(b'I', asset_hash, start_height)
.map(TxHistoryRow::from_row)
.filter_map(|history| {
chain
.tx_confirming_block(&history.get_txid())
.map(|blockid| (history, blockid))
});

let mut stats = init_stats;
let mut seen_txids = HashSet::new();
let mut lastblock = None;

for (history, blockid) in history_iter {
if lastblock != Some(blockid.hash) {
seen_txids.clear();
}

if seen_txids.insert(history.get_txid()) {
stats.tx_count += 1;
}

match history.key.txinfo {
TxHistoryInfo::Funding(_) => {}

TxHistoryInfo::Spending(_) => {}

TxHistoryInfo::Issuance(issuance) => {
stats.issuance_count += 1;

match issuance.issued_amount {
Some(amount) => stats.issued_amount += amount,
None => stats.issued_amount_known = false,
}
}
}

lastblock = Some(blockid.hash);
}

(stats, lastblock)
}
2 changes: 1 addition & 1 deletion src/new_index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod db;
pub mod db;
mod fetch;
mod mempool;
pub mod precache;
Expand Down
6 changes: 1 addition & 5 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ impl Query {

#[cfg(feature = "liquid")]
pub fn lookup_asset(&self, asset_hash: &[u8]) -> Result<Option<AssetEntry>> {
lookup_asset(
&self.chain.store().history_db(),
self.asset_db.as_ref(),
asset_hash,
)
lookup_asset(&self.chain, self.asset_db.as_ref(), asset_hash)
}
}
12 changes: 8 additions & 4 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl Store {
&self.history_db
}

pub fn cache_db(&self) -> &DB {
&self.cache_db
}

pub fn done_initial_sync(&self) -> bool {
self.txstore_db.get(b"t").is_some()
}
Expand Down Expand Up @@ -324,7 +328,7 @@ impl ChainQuery {
})
}

fn history_iter_scan(&self, code: u8, hash: &[u8], start_height: usize) -> ScanIterator {
pub fn history_iter_scan(&self, code: u8, hash: &[u8], start_height: usize) -> ScanIterator {
self.store.history_db.iter_scan_from(
&TxHistoryRow::filter(code, &hash[..]),
&TxHistoryRow::prefix_height(code, &hash[..], start_height as u32),
Expand Down Expand Up @@ -595,7 +599,7 @@ impl ChainQuery {
}

// Get the height of a blockhash, only if its part of the best chain
fn height_by_hash(&self, hash: &Sha256dHash) -> Option<usize> {
pub fn height_by_hash(&self, hash: &Sha256dHash) -> Option<usize> {
self.store
.indexed_headers
.read()
Expand Down Expand Up @@ -1229,15 +1233,15 @@ impl TxHistoryRow {
}
}

fn from_row(row: DBRow) -> Self {
pub fn from_row(row: DBRow) -> Self {
let key = bincode::config()
.big_endian()
.deserialize(&row.key)
.expect("failed to deserialize TxHistoryKey");
TxHistoryRow { key }
}

fn get_txid(&self) -> Sha256dHash {
pub fn get_txid(&self) -> Sha256dHash {
self.key.txinfo.get_txid()
}
fn get_outpoint(&self) -> OutPoint {
Expand Down

0 comments on commit 8f3a393

Please sign in to comment.