From fc9513c227fb110985c033ccce8540e43f82d0bf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 17 Oct 2021 00:04:08 -0700 Subject: [PATCH] Add new module for DeltaTableState (#464) * Add new module for delta table state. * Leverage type system to make sure table states are only constructed from a single checkpoint or commit --- rust/src/checkpoints.rs | 3 +- rust/src/delta.rs | 291 +++++----------------------------------- rust/src/lib.rs | 1 + rust/src/table_state.rs | 265 ++++++++++++++++++++++++++++++++++++ 4 files changed, 303 insertions(+), 257 deletions(-) create mode 100644 rust/src/table_state.rs diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index 60b1be7e80..dd015398eb 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -17,8 +17,9 @@ use super::delta_arrow::delta_log_schema_for_table; use super::open_table_with_version; use super::schema::*; use super::storage::{StorageBackend, StorageError}; +use super::table_state::DeltaTableState; use super::writer::time_utils; -use super::{CheckPoint, DeltaTableError, DeltaTableState}; +use super::{CheckPoint, DeltaTableError}; use crate::DeltaTable; /// Error returned when there is an error during creating a checkpoint. diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 3be789f354..2500832b06 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -9,17 +9,12 @@ use futures::StreamExt; use lazy_static::lazy_static; use log::*; use parquet::errors::ParquetError; -use parquet::file::{ - reader::{FileReader, SerializedFileReader}, - serialized_reader::SliceableCursor, -}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; -use std::io::{BufRead, BufReader, Cursor}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::Ordering, collections::HashSet}; use uuid::Uuid; @@ -28,11 +23,11 @@ use crate::action::Stats; use super::action; use super::action::{Action, DeltaOperation}; -use super::delta_config; use super::partitions::{DeltaTablePartition, PartitionFilter}; use super::schema::*; use super::storage; use super::storage::{parse_uri, StorageBackend, StorageError, UriError}; +use super::table_state::DeltaTableState; use crate::delta_config::DeltaConfigError; /// Metadata for a checkpoint file @@ -369,117 +364,6 @@ impl From for LoadCheckpointError { } } -/// State snapshot currently held by the Delta Table instance. -#[derive(Default, Debug, Clone)] -pub struct DeltaTableState { - // A remove action should remain in the state of the table as a tombstone until it has expired. - // A tombstone expires when the creation timestamp of the delta file exceeds the expiration - tombstones: HashSet, - files: Vec, - commit_infos: Vec>, - app_transaction_version: HashMap, - min_reader_version: i32, - min_writer_version: i32, - current_metadata: Option, - tombstone_retention_millis: DeltaDataTypeLong, -} - -impl DeltaTableState { - /// Full list of tombstones (remove actions) representing files removed from table state). - pub fn all_tombstones(&self) -> &HashSet { - &self.tombstones - } - - /// List of unexpired tombstones (remove actions) representing files removed from table state. - /// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week. - pub fn unexpired_tombstones(&self) -> impl Iterator { - let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis; - self.tombstones - .iter() - .filter(move |t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp) - } - - /// Full list of add actions representing all parquet files that are part of the current - /// delta table state. - pub fn files(&self) -> &Vec { - self.files.as_ref() - } - - /// HashMap containing the last txn version stored for every app id writing txn - /// actions. - pub fn app_transaction_version(&self) -> &HashMap { - &self.app_transaction_version - } - - /// The min reader version required by the protocol. - pub fn min_reader_version(&self) -> i32 { - self.min_reader_version - } - - /// The min writer version required by the protocol. - pub fn min_writer_version(&self) -> i32 { - self.min_writer_version - } - - /// The most recent metadata of the table. - pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { - self.current_metadata.as_ref() - } - - /// merges new state information into our state - pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) { - if !new_state.tombstones.is_empty() { - let new_removals: HashSet<&str> = new_state - .tombstones - .iter() - .map(|s| s.path.as_str()) - .collect(); - - self.files - .retain(|a| !new_removals.contains(a.path.as_str())); - } - - if require_tombstones { - new_state.tombstones.into_iter().for_each(|r| { - self.tombstones.insert(r); - }); - - if !new_state.files.is_empty() { - let new_adds: HashSet<&str> = - new_state.files.iter().map(|s| s.path.as_str()).collect(); - self.tombstones - .retain(|a| !new_adds.contains(a.path.as_str())); - } - } - - self.files.append(&mut new_state.files); - - if new_state.min_reader_version > 0 { - self.min_reader_version = new_state.min_reader_version; - self.min_writer_version = new_state.min_writer_version; - } - - if new_state.current_metadata.is_some() { - self.tombstone_retention_millis = new_state.tombstone_retention_millis; - self.current_metadata = new_state.current_metadata.take(); - } - - new_state - .app_transaction_version - .drain() - .for_each(|(app_id, version)| { - *self - .app_transaction_version - .entry(app_id) - .or_insert(version) = version - }); - - if !new_state.commit_infos.is_empty() { - self.commit_infos.append(&mut new_state.commit_infos); - } - } -} - #[inline] /// Return path relative to parent_path fn extract_rel_path<'a, 'b>( @@ -651,12 +535,14 @@ pub struct DeltaTable { } impl DeltaTable { - fn commit_uri_from_version(&self, version: DeltaDataTypeVersion) -> String { + /// Return the uri of commit version. + pub fn commit_uri_from_version(&self, version: DeltaDataTypeVersion) -> String { let version = format!("{:020}.json", version); self.storage.join_path(&self.log_uri, &version) } - fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { + /// Return the list of paths of given checkpoint. + pub fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix_pattern = format!("{:020}", check_point.version); let checkpoint_prefix = self .storage @@ -748,51 +634,17 @@ impl DeltaTable { Ok(cp) } - fn apply_log_from_bufread( - &mut self, - reader: BufReader, - ) -> Result<(), ApplyLogError> { - let mut new_state = DeltaTableState::default(); - for line in reader.lines() { - let action: Action = serde_json::from_str(line?.as_str())?; - process_action(&mut new_state, action, true)?; - } - + async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> { + let new_state = DeltaTableState::from_commit(self, version).await?; self.state.merge(new_state, self.config.require_tombstones); Ok(()) } - async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> { - let commit_uri = self.commit_uri_from_version(version); - let commit_log_bytes = self.storage.get_obj(&commit_uri).await?; - let reader = BufReader::new(Cursor::new(commit_log_bytes)); - - self.apply_log_from_bufread(reader) - } - async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> { - let checkpoint_data_paths = self.get_checkpoint_data_paths(&check_point); - // process actions from checkpoint - self.state = DeltaTableState::default(); - - for f in &checkpoint_data_paths { - let obj = self.storage.get_obj(f).await?; - let preader = SerializedFileReader::new(SliceableCursor::new(obj))?; - let schema = preader.metadata().file_metadata().schema(); - if !schema.is_group() { - return Err(DeltaTableError::from(action::ActionError::Generic( - "Action record in checkpoint should be a struct".to_string(), - ))); - } - for record in preader.get_row_iter(None)? { - process_action( - &mut self.state, - Action::from_parquet_record(schema, &record)?, - self.config.require_tombstones, - )?; - } - } + self.state = + DeltaTableState::from_checkpoint(self, &check_point, self.config.require_tombstones) + .await?; Ok(()) } @@ -970,7 +822,7 @@ impl DeltaTable { &mut self, limit: Option, ) -> Result>, DeltaTableError> { - let commit_infos_list = self.state.commit_infos.iter().rev().map(Map::clone); + let commit_infos_list = self.state.commit_infos().iter().rev().map(Map::clone); match limit { Some(l) => Ok(commit_infos_list.take(l).collect()), None => Ok(commit_infos_list.collect()), @@ -985,7 +837,7 @@ impl DeltaTable { ) -> Result, DeltaTableError> { let files = self .state - .files + .files() .iter() .filter(|add| { let partitions = add @@ -1029,13 +881,13 @@ impl DeltaTable { /// Return a refernece to all active "add" actions present in the loaded state pub fn get_active_add_actions(&self) -> &Vec { - &self.state.files + self.state.files() } /// Returns an iterator of file names present in the loaded state #[inline] pub fn get_files_iter(&self) -> impl Iterator { - self.state.files.iter().map(|add| add.path.as_str()) + self.state.files().iter().map(|add| add.path.as_str()) } /// Returns a collection of file names present in the loaded state @@ -1047,7 +899,7 @@ impl DeltaTable { /// Returns file names present in the loaded state in HashSet pub fn get_file_set(&self) -> HashSet<&str> { self.state - .files + .files() .iter() .map(|add| add.path.as_str()) .collect() @@ -1065,7 +917,7 @@ impl DeltaTable { /// Returns a URIs for all active files present in the current table version. pub fn get_file_uris(&self) -> Vec { self.state - .files + .files() .iter() .map(|add| self.storage.join_path(&self.table_uri, &add.path)) .collect() @@ -1074,7 +926,7 @@ impl DeltaTable { /// Returns statistics for files, in order pub fn get_stats(&self) -> Vec, DeltaTableError>> { self.state - .files + .files() .iter() .map(|add| add.get_stats().map_err(DeltaTableError::from)) .collect() @@ -1088,8 +940,7 @@ impl DeltaTable { /// Returns the metadata associated with the loaded state. pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> { self.state - .current_metadata - .as_ref() + .current_metadata() .ok_or(DeltaTableError::NoMetadata) } @@ -1100,19 +951,19 @@ impl DeltaTable { /// Returns the current version of the DeltaTable based on the loaded metadata. pub fn get_app_transaction_version(&self) -> &HashMap { - &self.state.app_transaction_version + self.state.app_transaction_version() } /// Returns the minimum reader version supported by the DeltaTable based on the loaded /// metadata. pub fn get_min_reader_version(&self) -> i32 { - self.state.min_reader_version + self.state.min_reader_version() } /// Returns the minimum writer version supported by the DeltaTable based on the loaded /// metadata. pub fn get_min_writer_version(&self) -> i32 { - self.state.min_writer_version + self.state.min_writer_version() } /// List files no longer referenced by a Delta table and are older than the retention threshold. @@ -1122,9 +973,9 @@ impl DeltaTable { ) -> Result, DeltaTableError> { let retention_millis = retention_hours .map(|hours| 3600000 * hours as i64) - .unwrap_or(self.state.tombstone_retention_millis); + .unwrap_or_else(|| self.state.tombstone_retention_millis()); - if retention_millis < self.state.tombstone_retention_millis { + if retention_millis < self.state.tombstone_retention_millis() { return Err(DeltaTableError::InvalidVacuumRetentionPeriod); } @@ -1153,8 +1004,7 @@ impl DeltaTable { && !path_name.starts_with("_change_data") && !self .state - .current_metadata - .as_ref() + .current_metadata() .ok_or(DeltaTableError::NoMetadata)? .partition_columns .iter() @@ -1217,7 +1067,7 @@ impl DeltaTable { /// Return table schema parsed from transaction log. Return None if table hasn't been loaded or /// no metadata was found in the log. pub fn schema(&self) -> Option<&Schema> { - self.state.current_metadata.as_ref().map(|m| &m.schema) + self.state.current_metadata().map(|m| &m.schema) } /// Return table schema parsed from transaction log. Return `DeltaTableError` if table hasn't @@ -1230,8 +1080,7 @@ impl DeltaTable { pub fn get_configurations(&self) -> Result<&HashMap>, DeltaTableError> { Ok(self .state - .current_metadata - .as_ref() + .current_metadata() .ok_or(DeltaTableError::NoMetadata)? .get_configuration()) } @@ -1324,13 +1173,10 @@ impl DeltaTable { transaction.add_actions(actions.clone()); let prepared_commit = transaction.prepare_commit(None).await?; - self.try_commit_transaction(&prepared_commit, 0).await?; + let committed_version = self.try_commit_transaction(&prepared_commit, 0).await?; - // Mutate the DeltaTable's state using process_action() - // in order to get most up-to-date state based on the commit above - for action in actions { - let _ = process_action(&mut self.state, action, self.config.require_tombstones)?; - } + let new_state = DeltaTableState::from_commit(self, committed_version).await?; + self.state.merge(new_state, self.config.require_tombstones); Ok(()) } @@ -1380,7 +1226,7 @@ impl fmt::Display for DeltaTable { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!(f, "DeltaTable({})", self.table_uri)?; writeln!(f, "\tversion: {}", self.version)?; - match self.state.current_metadata.as_ref() { + match self.state.current_metadata() { Some(metadata) => { writeln!(f, "\tmetadata: {}", metadata)?; } @@ -1391,9 +1237,10 @@ impl fmt::Display for DeltaTable { writeln!( f, "\tmin_version: read={}, write={}", - self.state.min_reader_version, self.state.min_writer_version + self.state.min_reader_version(), + self.state.min_writer_version() )?; - writeln!(f, "\tfiles count: {}", self.state.files.len()) + writeln!(f, "\tfiles count: {}", self.state.files().len()) } } @@ -1661,46 +1508,6 @@ fn log_entry_from_actions(actions: &[Action]) -> Result Result<(), ApplyLogError> { - match action { - Action::add(v) => { - state.files.push(v.path_decoded()?); - } - Action::remove(v) => { - if handle_tombstones { - let v = v.path_decoded()?; - state.tombstones.insert(v); - } - } - Action::protocol(v) => { - state.min_reader_version = v.min_reader_version; - state.min_writer_version = v.min_writer_version; - } - Action::metaData(v) => { - let md = DeltaTableMetaData::try_from(v)?; - state.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION - .get_interval_from_metadata(&md)? - .as_millis() as i64; - state.current_metadata = Some(md); - } - Action::txn(v) => { - *state - .app_transaction_version - .entry(v.app_id) - .or_insert(v.version) = v.version; - } - Action::commitInfo(v) => { - state.commit_infos.push(v); - } - } - - Ok(()) -} - /// Creates and loads a DeltaTable from the given path with current metadata. /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table(table_uri: &str) -> Result { @@ -1742,37 +1549,9 @@ pub fn crate_version() -> &'static str { mod tests { use super::*; use pretty_assertions::assert_eq; + use std::io::{BufRead, BufReader}; use std::{collections::HashMap, fs::File, path::Path}; - #[test] - fn state_records_new_txn_version() { - let mut app_transaction_version = HashMap::new(); - app_transaction_version.insert("abc".to_string(), 1); - app_transaction_version.insert("xyz".to_string(), 1); - - let mut state = DeltaTableState { - files: vec![], - commit_infos: vec![], - tombstones: HashSet::new(), - current_metadata: None, - min_reader_version: 1, - min_writer_version: 2, - app_transaction_version, - tombstone_retention_millis: 0, - }; - - let txn_action = Action::txn(action::Txn { - app_id: "abc".to_string(), - version: 2, - last_updated: Some(0), - }); - - let _ = process_action(&mut state, txn_action, false).unwrap(); - - assert_eq!(2, *state.app_transaction_version.get("abc").unwrap()); - assert_eq!(1, *state.app_transaction_version.get("xyz").unwrap()); - } - #[cfg(feature = "s3")] #[test] fn normalize_table_uri() { @@ -1888,7 +1667,7 @@ mod tests { // Validation // assert DeltaTable version is now 0 and no data files have been added assert_eq!(dt.version, 0); - assert_eq!(dt.state.files.len(), 0); + assert_eq!(dt.state.files().len(), 0); // assert new _delta_log file created in tempDir let table_path = Path::new(&dt.table_uri); diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 9ea9d24f10..a0f36a779d 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -67,6 +67,7 @@ pub mod delta_config; pub mod partitions; mod schema; pub mod storage; +mod table_state; pub mod writer; #[cfg(feature = "datafusion-ext")] diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs new file mode 100644 index 0000000000..6c0134937e --- /dev/null +++ b/rust/src/table_state.rs @@ -0,0 +1,265 @@ +//! The module for delta table state. + +use chrono::Utc; +use parquet::file::{ + reader::{FileReader, SerializedFileReader}, + serialized_reader::SliceableCursor, +}; +use serde_json::{Map, Value}; +use std::collections::HashMap; +use std::collections::HashSet; +use std::convert::TryFrom; +use std::io::{BufRead, BufReader, Cursor}; + +use super::{ + ApplyLogError, CheckPoint, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, + DeltaTableError, DeltaTableMetaData, +}; +use crate::action; +use crate::delta_config; + +/// State snapshot currently held by the Delta Table instance. +#[derive(Default, Debug, Clone)] +pub struct DeltaTableState { + // A remove action should remain in the state of the table as a tombstone until it has expired. + // A tombstone expires when the creation timestamp of the delta file exceeds the expiration + tombstones: HashSet, + files: Vec, + commit_infos: Vec>, + app_transaction_version: HashMap, + min_reader_version: i32, + min_writer_version: i32, + current_metadata: Option, + tombstone_retention_millis: DeltaDataTypeLong, +} + +impl DeltaTableState { + /// Construct a delta table state object from commit version. + pub async fn from_commit( + table: &DeltaTable, + version: DeltaDataTypeVersion, + ) -> Result { + let commit_uri = table.commit_uri_from_version(version); + let commit_log_bytes = table.storage.get_obj(&commit_uri).await?; + let reader = BufReader::new(Cursor::new(commit_log_bytes)); + + let mut new_state = DeltaTableState::default(); + for line in reader.lines() { + let action: action::Action = serde_json::from_str(line?.as_str())?; + new_state.process_action(action, true)?; + } + + Ok(new_state) + } + + /// Construct a delta table state object from checkpoint. + pub async fn from_checkpoint( + table: &DeltaTable, + check_point: &CheckPoint, + require_tombstones: bool, + ) -> Result { + let checkpoint_data_paths = table.get_checkpoint_data_paths(check_point); + // process actions from checkpoint + let mut new_state = DeltaTableState::default(); + + for f in &checkpoint_data_paths { + let obj = table.storage.get_obj(f).await?; + let preader = SerializedFileReader::new(SliceableCursor::new(obj))?; + let schema = preader.metadata().file_metadata().schema(); + if !schema.is_group() { + return Err(DeltaTableError::from(action::ActionError::Generic( + "Action record in checkpoint should be a struct".to_string(), + ))); + } + for record in preader.get_row_iter(None)? { + new_state.process_action( + action::Action::from_parquet_record(schema, &record)?, + require_tombstones, + )?; + } + } + + Ok(new_state) + } + + /// List of commit info maps. + pub fn commit_infos(&self) -> &Vec> { + &self.commit_infos + } + + /// Retention of tombstone in milliseconds. + pub fn tombstone_retention_millis(&self) -> DeltaDataTypeLong { + self.tombstone_retention_millis + } + + /// Full list of tombstones (remove actions) representing files removed from table state). + pub fn all_tombstones(&self) -> &HashSet { + &self.tombstones + } + + /// List of unexpired tombstones (remove actions) representing files removed from table state. + /// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week. + pub fn unexpired_tombstones(&self) -> impl Iterator { + let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis; + self.tombstones + .iter() + .filter(move |t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp) + } + + /// Full list of add actions representing all parquet files that are part of the current + /// delta table state. + pub fn files(&self) -> &Vec { + self.files.as_ref() + } + + /// HashMap containing the last txn version stored for every app id writing txn + /// actions. + pub fn app_transaction_version(&self) -> &HashMap { + &self.app_transaction_version + } + + /// The min reader version required by the protocol. + pub fn min_reader_version(&self) -> i32 { + self.min_reader_version + } + + /// The min writer version required by the protocol. + pub fn min_writer_version(&self) -> i32 { + self.min_writer_version + } + + /// The most recent metadata of the table. + pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { + self.current_metadata.as_ref() + } + + /// merges new state information into our state + pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) { + if !new_state.tombstones.is_empty() { + let new_removals: HashSet<&str> = new_state + .tombstones + .iter() + .map(|s| s.path.as_str()) + .collect(); + + self.files + .retain(|a| !new_removals.contains(a.path.as_str())); + } + + if require_tombstones { + new_state.tombstones.into_iter().for_each(|r| { + self.tombstones.insert(r); + }); + + if !new_state.files.is_empty() { + let new_adds: HashSet<&str> = + new_state.files.iter().map(|s| s.path.as_str()).collect(); + self.tombstones + .retain(|a| !new_adds.contains(a.path.as_str())); + } + } + + self.files.append(&mut new_state.files); + + if new_state.min_reader_version > 0 { + self.min_reader_version = new_state.min_reader_version; + self.min_writer_version = new_state.min_writer_version; + } + + if new_state.current_metadata.is_some() { + self.tombstone_retention_millis = new_state.tombstone_retention_millis; + self.current_metadata = new_state.current_metadata.take(); + } + + new_state + .app_transaction_version + .drain() + .for_each(|(app_id, version)| { + *self + .app_transaction_version + .entry(app_id) + .or_insert(version) = version + }); + + if !new_state.commit_infos.is_empty() { + self.commit_infos.append(&mut new_state.commit_infos); + } + } + + /// Process given action by updating current state. + fn process_action( + &mut self, + action: action::Action, + handle_tombstones: bool, + ) -> Result<(), ApplyLogError> { + match action { + action::Action::add(v) => { + self.files.push(v.path_decoded()?); + } + action::Action::remove(v) => { + if handle_tombstones { + let v = v.path_decoded()?; + self.tombstones.insert(v); + } + } + action::Action::protocol(v) => { + self.min_reader_version = v.min_reader_version; + self.min_writer_version = v.min_writer_version; + } + action::Action::metaData(v) => { + let md = DeltaTableMetaData::try_from(v)?; + self.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION + .get_interval_from_metadata(&md)? + .as_millis() as i64; + self.current_metadata = Some(md); + } + action::Action::txn(v) => { + *self + .app_transaction_version + .entry(v.app_id) + .or_insert(v.version) = v.version; + } + action::Action::commitInfo(v) => { + self.commit_infos.push(v); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use std::collections::HashMap; + + #[test] + fn state_records_new_txn_version() { + let mut app_transaction_version = HashMap::new(); + app_transaction_version.insert("abc".to_string(), 1); + app_transaction_version.insert("xyz".to_string(), 1); + + let mut state = DeltaTableState { + files: vec![], + commit_infos: vec![], + tombstones: HashSet::new(), + current_metadata: None, + min_reader_version: 1, + min_writer_version: 2, + app_transaction_version, + tombstone_retention_millis: 0, + }; + + let txn_action = action::Action::txn(action::Txn { + app_id: "abc".to_string(), + version: 2, + last_updated: Some(0), + }); + + let _ = state.process_action(txn_action, false).unwrap(); + + assert_eq!(2, *state.app_transaction_version().get("abc").unwrap()); + assert_eq!(1, *state.app_transaction_version().get("xyz").unwrap()); + } +}