#![warn(missing_docs)]
pub mod light;
pub mod offchain;
#[cfg(any(feature = "with-kvdb-rocksdb", test))]
pub mod bench;
mod children;
mod cache;
mod changes_tries_storage;
mod storage_cache;
#[cfg(any(feature = "with-kvdb-rocksdb", test))]
mod upgrade;
mod utils;
mod stats;
#[cfg(feature = "with-parity-db")]
mod parity_db;
use std::sync::Arc;
use std::path::{Path, PathBuf};
use std::io;
use std::collections::{HashMap, HashSet};
use sc_client_api::{
UsageInfo, MemoryInfo, IoInfo, MemorySize,
backend::{NewBlockState, PrunableStateChangesTrieStorage, ProvideChtRoots},
leaves::{LeafSet, FinalizationDisplaced}, cht,
};
use sp_blockchain::{
Result as ClientResult, Error as ClientError,
well_known_cache_keys, HeaderBackend,
};
use codec::{Decode, Encode};
use hash_db::Prefix;
use sp_trie::{MemoryDB, PrefixedMemoryDB, prefixed_key};
use sp_database::Transaction;
use parking_lot::RwLock;
use sp_core::ChangesTrieConfiguration;
use sp_core::offchain::storage::{OffchainOverlayedChange, OffchainOverlayedChanges};
use sp_core::storage::{well_known_keys, ChildInfo};
use sp_arithmetic::traits::Saturating;
use sp_runtime::{generic::{DigestItem, BlockId}, Justification, Storage};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, One, SaturatedConversion, HashFor,
};
use sp_state_machine::{
DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, UsageInfo as StateUsageInfo,
StorageCollection, ChildStorageCollection,
backend::Backend as StateBackend, StateMachineStats,
};
use crate::utils::{DatabaseType, Meta, meta_keys, read_db, read_meta};
use crate::changes_tries_storage::{DbChangesTrieStorage, DbChangesTrieStorageTransaction};
use sc_state_db::StateDb;
use sp_blockchain::{CachedHeaderMetadata, HeaderMetadata, HeaderMetadataCache};
use crate::storage_cache::{CachingState, SyncingCachingState, SharedCache, new_shared_cache};
use crate::stats::StateUsageStats;
use log::{trace, debug, warn};
pub use sp_database::Database;
pub use sc_state_db::PruningMode;
#[cfg(any(feature = "with-kvdb-rocksdb", test))]
pub use bench::BenchmarkingState;
const MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR: u32 = 32768;
const DEFAULT_CHILD_RATIO: (usize, usize) = (1, 10);
pub type DbState<B> = sp_state_machine::TrieBackend<
Arc<dyn sp_state_machine::Storage<HashFor<B>>>, HashFor<B>
>;
const DB_HASH_LEN: usize = 32;
pub type DbHash = [u8; DB_HASH_LEN];
pub struct RefTrackingState<Block: BlockT> {
state: DbState<Block>,
storage: Arc<StorageDb<Block>>,
parent_hash: Option<Block::Hash>,
}
impl<B: BlockT> RefTrackingState<B> {
fn new(state: DbState<B>, storage: Arc<StorageDb<B>>, parent_hash: Option<B::Hash>) -> Self {
RefTrackingState {
state,
parent_hash,
storage,
}
}
}
impl<B: BlockT> Drop for RefTrackingState<B> {
fn drop(&mut self) {
if let Some(hash) = &self.parent_hash {
self.storage.state_db.unpin(hash);
}
}
}
impl<Block: BlockT> std::fmt::Debug for RefTrackingState<Block> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Block {:?}", self.parent_hash)
}
}
impl<B: BlockT> StateBackend<HashFor<B>> for RefTrackingState<B> {
type Error = <DbState<B> as StateBackend<HashFor<B>>>::Error;
type Transaction = <DbState<B> as StateBackend<HashFor<B>>>::Transaction;
type TrieBackendStorage = <DbState<B> as StateBackend<HashFor<B>>>::TrieBackendStorage;
fn storage(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.state.storage(key)
}
fn storage_hash(&self, key: &[u8]) -> Result<Option<B::Hash>, Self::Error> {
self.state.storage_hash(key)
}
fn child_storage(
&self,
child_info: &ChildInfo,
key: &[u8],
) -> Result<Option<Vec<u8>>, Self::Error> {
self.state.child_storage(child_info, key)
}
fn exists_storage(&self, key: &[u8]) -> Result<bool, Self::Error> {
self.state.exists_storage(key)
}
fn exists_child_storage(
&self,
child_info: &ChildInfo,
key: &[u8],
) -> Result<bool, Self::Error> {
self.state.exists_child_storage(child_info, key)
}
fn next_storage_key(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.state.next_storage_key(key)
}
fn next_child_storage_key(
&self,
child_info: &ChildInfo,
key: &[u8],
) -> Result<Option<Vec<u8>>, Self::Error> {
self.state.next_child_storage_key(child_info, key)
}
fn for_keys_with_prefix<F: FnMut(&[u8])>(&self, prefix: &[u8], f: F) {
self.state.for_keys_with_prefix(prefix, f)
}
fn for_key_values_with_prefix<F: FnMut(&[u8], &[u8])>(&self, prefix: &[u8], f: F) {
self.state.for_key_values_with_prefix(prefix, f)
}
fn for_keys_in_child_storage<F: FnMut(&[u8])>(
&self,
child_info: &ChildInfo,
f: F,
) {
self.state.for_keys_in_child_storage(child_info, f)
}
fn for_child_keys_with_prefix<F: FnMut(&[u8])>(
&self,
child_info: &ChildInfo,
prefix: &[u8],
f: F,
) {
self.state.for_child_keys_with_prefix(child_info, prefix, f)
}
fn storage_root<'a>(
&self,
delta: impl Iterator<Item=(&'a [u8], Option<&'a [u8]>)>,
) -> (B::Hash, Self::Transaction) where B::Hash: Ord {
self.state.storage_root(delta)
}
fn child_storage_root<'a>(
&self,
child_info: &ChildInfo,
delta: impl Iterator<Item=(&'a [u8], Option<&'a [u8]>)>,
) -> (B::Hash, bool, Self::Transaction) where B::Hash: Ord {
self.state.child_storage_root(child_info, delta)
}
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.state.pairs()
}
fn keys(&self, prefix: &[u8]) -> Vec<Vec<u8>> {
self.state.keys(prefix)
}
fn child_keys(
&self,
child_info: &ChildInfo,
prefix: &[u8],
) -> Vec<Vec<u8>> {
self.state.child_keys(child_info, prefix)
}
fn as_trie_backend(&mut self)
-> Option<&sp_state_machine::TrieBackend<Self::TrieBackendStorage, HashFor<B>>>
{
self.state.as_trie_backend()
}
fn register_overlay_stats(&mut self, stats: &StateMachineStats) {
self.state.register_overlay_stats(stats);
}
fn usage_info(&self) -> StateUsageInfo {
self.state.usage_info()
}
}
pub struct DatabaseSettings {
pub state_cache_size: usize,
pub state_cache_child_ratio: Option<(usize, usize)>,
pub pruning: PruningMode,
pub source: DatabaseSettingsSrc,
}
#[derive(Debug, Clone)]
pub enum DatabaseSettingsSrc {
RocksDb {
path: PathBuf,
cache_size: usize,
},
ParityDb {
path: PathBuf,
},
Custom(Arc<dyn Database<DbHash>>),
}
impl DatabaseSettingsSrc {
pub fn path(&self) -> Option<&Path> {
match self {
DatabaseSettingsSrc::RocksDb { path, .. } => Some(path.as_path()),
DatabaseSettingsSrc::ParityDb { path, .. } => Some(path.as_path()),
DatabaseSettingsSrc::Custom(_) => None,
}
}
pub fn supports_ref_counting(&self) -> bool {
match self {
DatabaseSettingsSrc::ParityDb { .. } => true,
_ => false,
}
}
}
impl std::fmt::Display for DatabaseSettingsSrc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = match self {
DatabaseSettingsSrc::RocksDb { .. } => "RocksDb",
DatabaseSettingsSrc::ParityDb { .. } => "ParityDb",
DatabaseSettingsSrc::Custom(_) => "Custom",
};
write!(f, "{}", name)
}
}
pub(crate) mod columns {
pub const META: u32 = crate::utils::COLUMN_META;
pub const STATE: u32 = 1;
pub const STATE_META: u32 = 2;
pub const KEY_LOOKUP: u32 = 3;
pub const HEADER: u32 = 4;
pub const BODY: u32 = 5;
pub const JUSTIFICATION: u32 = 6;
pub const CHANGES_TRIE: u32 = 7;
pub const AUX: u32 = 8;
pub const OFFCHAIN: u32 = 9;
pub const CACHE: u32 = 10;
}
struct PendingBlock<Block: BlockT> {
header: Block::Header,
justification: Option<Justification>,
body: Option<Vec<Block::Extrinsic>>,
leaf_state: NewBlockState,
}
struct StateMetaDb<'a>(&'a dyn Database<DbHash>);
impl<'a> sc_state_db::MetaDb for StateMetaDb<'a> {
type Error = io::Error;
fn get_meta(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.0.get(columns::STATE_META, key))
}
}
pub struct BlockchainDb<Block: BlockT> {
db: Arc<dyn Database<DbHash>>,
meta: Arc<RwLock<Meta<NumberFor<Block>, Block::Hash>>>,
leaves: RwLock<LeafSet<Block::Hash, NumberFor<Block>>>,
header_metadata_cache: Arc<HeaderMetadataCache<Block>>,
}
impl<Block: BlockT> BlockchainDb<Block> {
fn new(db: Arc<dyn Database<DbHash>>) -> ClientResult<Self> {
let meta = read_meta::<Block>(&*db, columns::HEADER)?;
let leaves = LeafSet::read_from_db(&*db, columns::META, meta_keys::LEAF_PREFIX)?;
Ok(BlockchainDb {
db,
leaves: RwLock::new(leaves),
meta: Arc::new(RwLock::new(meta)),
header_metadata_cache: Arc::new(HeaderMetadataCache::default()),
})
}
fn update_meta(
&self,
hash: Block::Hash,
number: <Block::Header as HeaderT>::Number,
is_best: bool,
is_finalized: bool
) {
let mut meta = self.meta.write();
if number.is_zero() {
meta.genesis_hash = hash;
meta.finalized_hash = hash;
}
if is_best {
meta.best_number = number;
meta.best_hash = hash;
}
if is_finalized {
meta.finalized_number = number;
meta.finalized_hash = hash;
}
}
fn changes_trie_root(&self, block: BlockId<Block>) -> ClientResult<Option<Block::Hash>> {
self.header(block)
.map(|header| header.and_then(|header|
header.digest().log(DigestItem::as_changes_trie_root)
.cloned()))
}
}
impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for BlockchainDb<Block> {
fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
utils::read_header(&*self.db, columns::KEY_LOOKUP, columns::HEADER, id)
}
fn info(&self) -> sc_client_api::blockchain::Info<Block> {
let meta = self.meta.read();
sc_client_api::blockchain::Info {
best_hash: meta.best_hash,
best_number: meta.best_number,
genesis_hash: meta.genesis_hash,
finalized_hash: meta.finalized_hash,
finalized_number: meta.finalized_number,
number_leaves: self.leaves.read().count(),
}
}
fn status(&self, id: BlockId<Block>) -> ClientResult<sc_client_api::blockchain::BlockStatus> {
let exists = match id {
BlockId::Hash(_) => read_db(
&*self.db,
columns::KEY_LOOKUP,
columns::HEADER,
id
)?.is_some(),
BlockId::Number(n) => n <= self.meta.read().best_number,
};
match exists {
true => Ok(sc_client_api::blockchain::BlockStatus::InChain),
false => Ok(sc_client_api::blockchain::BlockStatus::Unknown),
}
}
fn number(&self, hash: Block::Hash) -> ClientResult<Option<NumberFor<Block>>> {
Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number))
}
fn hash(&self, number: NumberFor<Block>) -> ClientResult<Option<Block::Hash>> {
self.header(BlockId::Number(number)).and_then(|maybe_header| match maybe_header {
Some(header) => Ok(Some(header.hash().clone())),
None => Ok(None),
})
}
}
impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<Block> {
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
match read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, id)? {
Some(body) => match Decode::decode(&mut &body[..]) {
Ok(body) => Ok(Some(body)),
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding body: {}", err)
)),
}
None => Ok(None),
}
}
fn justification(&self, id: BlockId<Block>) -> ClientResult<Option<Justification>> {
match read_db(&*self.db, columns::KEY_LOOKUP, columns::JUSTIFICATION, id)? {
Some(justification) => match Decode::decode(&mut &justification[..]) {
Ok(justification) => Ok(Some(justification)),
Err(err) => return Err(sp_blockchain::Error::Backend(
format!("Error decoding justification: {}", err)
)),
}
None => Ok(None),
}
}
fn last_finalized(&self) -> ClientResult<Block::Hash> {
Ok(self.meta.read().finalized_hash.clone())
}
fn cache(&self) -> Option<Arc<dyn sc_client_api::blockchain::Cache<Block>>> {
None
}
fn leaves(&self) -> ClientResult<Vec<Block::Hash>> {
Ok(self.leaves.read().hashes())
}
fn children(&self, parent_hash: Block::Hash) -> ClientResult<Vec<Block::Hash>> {
children::read_children(&*self.db, columns::META, meta_keys::CHILDREN_PREFIX, parent_hash)
}
}
impl<Block: BlockT> sc_client_api::blockchain::ProvideCache<Block> for BlockchainDb<Block> {
fn cache(&self) -> Option<Arc<dyn sc_client_api::blockchain::Cache<Block>>> {
None
}
}
impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
type Error = sp_blockchain::Error;
fn header_metadata(&self, hash: Block::Hash) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
self.header_metadata_cache.header_metadata(hash).map_or_else(|| {
self.header(BlockId::hash(hash))?.map(|header| {
let header_metadata = CachedHeaderMetadata::from(&header);
self.header_metadata_cache.insert_header_metadata(
header_metadata.hash,
header_metadata.clone(),
);
header_metadata
}).ok_or_else(|| ClientError::UnknownBlock(format!("header not found in db: {}", hash)))
}, Ok)
}
fn insert_header_metadata(&self, hash: Block::Hash, metadata: CachedHeaderMetadata<Block>) {
self.header_metadata_cache.insert_header_metadata(hash, metadata)
}
fn remove_header_metadata(&self, hash: Block::Hash) {
self.header_metadata_cache.remove_header_metadata(hash);
}
}
impl<Block: BlockT> ProvideChtRoots<Block> for BlockchainDb<Block> {
fn header_cht_root(
&self,
cht_size: NumberFor<Block>,
block: NumberFor<Block>,
) -> sp_blockchain::Result<Option<Block::Hash>> {
let cht_number = match cht::block_to_cht_number(cht_size, block) {
Some(number) => number,
None => return Ok(None),
};
let cht_start: NumberFor<Block> = cht::start_number(cht::size(), cht_number);
let mut current_num = cht_start;
let cht_range = ::std::iter::from_fn(|| {
let old_current_num = current_num;
current_num = current_num + One::one();
Some(old_current_num)
});
cht::compute_root::<Block::Header, HashFor<Block>, _>(
cht::size(), cht_number, cht_range.map(|num| self.hash(num))
).map(Some)
}
fn changes_trie_cht_root(
&self,
cht_size: NumberFor<Block>,
block: NumberFor<Block>,
) -> sp_blockchain::Result<Option<Block::Hash>> {
let cht_number = match cht::block_to_cht_number(cht_size, block) {
Some(number) => number,
None => return Ok(None),
};
let cht_start: NumberFor<Block> = cht::start_number(cht::size(), cht_number);
let mut current_num = cht_start;
let cht_range = ::std::iter::from_fn(|| {
let old_current_num = current_num;
current_num = current_num + One::one();
Some(old_current_num)
});
cht::compute_root::<Block::Header, HashFor<Block>, _>(
cht::size(),
cht_number,
cht_range.map(|num| self.changes_trie_root(BlockId::Number(num))),
).map(Some)
}
}
pub struct BlockImportOperation<Block: BlockT> {
old_state: SyncingCachingState<RefTrackingState<Block>, Block>,
db_updates: PrefixedMemoryDB<HashFor<Block>>,
storage_updates: StorageCollection,
child_storage_updates: ChildStorageCollection,
offchain_storage_updates: OffchainOverlayedChanges,
changes_trie_updates: MemoryDB<HashFor<Block>>,
changes_trie_build_cache_update: Option<ChangesTrieCacheAction<Block::Hash, NumberFor<Block>>>,
changes_trie_config_update: Option<Option<ChangesTrieConfiguration>>,
pending_block: Option<PendingBlock<Block>>,
aux_ops: Vec<(Vec<u8>, Option<Vec<u8>>)>,
finalized_blocks: Vec<(BlockId<Block>, Option<Justification>)>,
set_head: Option<BlockId<Block>>,
commit_state: bool,
}
impl<Block: BlockT> BlockImportOperation<Block> {
fn apply_offchain(&mut self, transaction: &mut Transaction<DbHash>) {
for ((prefix, key), value_operation) in self.offchain_storage_updates.drain() {
let key: Vec<u8> = prefix
.into_iter()
.chain(sp_core::sp_std::iter::once(b'/'))
.chain(key.into_iter())
.collect();
match value_operation {
OffchainOverlayedChange::SetValue(val) => transaction.set_from_vec(columns::OFFCHAIN, &key, val),
OffchainOverlayedChange::Remove => transaction.remove(columns::OFFCHAIN, &key),
}
}
}
fn apply_aux(&mut self, transaction: &mut Transaction<DbHash>) {
for (key, maybe_val) in self.aux_ops.drain(..) {
match maybe_val {
Some(val) => transaction.set_from_vec(columns::AUX, &key, val),
None => transaction.remove(columns::AUX, &key),
}
}
}
}
impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for BlockImportOperation<Block> {
type State = SyncingCachingState<RefTrackingState<Block>, Block>;
fn state(&self) -> ClientResult<Option<&Self::State>> {
Ok(Some(&self.old_state))
}
fn set_block_data(
&mut self,
header: Block::Header,
body: Option<Vec<Block::Extrinsic>>,
justification: Option<Justification>,
leaf_state: NewBlockState,
) -> ClientResult<()> {
assert!(self.pending_block.is_none(), "Only one block per operation is allowed");
if let Some(changes_trie_config_update) = changes_tries_storage::extract_new_configuration(&header) {
self.changes_trie_config_update = Some(changes_trie_config_update.clone());
}
self.pending_block = Some(PendingBlock {
header,
body,
justification,
leaf_state,
});
Ok(())
}
fn update_cache(&mut self, _cache: HashMap<well_known_cache_keys::Id, Vec<u8>>) {
}
fn update_db_storage(&mut self, update: PrefixedMemoryDB<HashFor<Block>>) -> ClientResult<()> {
self.db_updates = update;
Ok(())
}
fn reset_storage(
&mut self,
storage: Storage,
) -> ClientResult<Block::Hash> {
if storage.top.keys().any(|k| well_known_keys::is_child_storage_key(&k)) {
return Err(sp_blockchain::Error::GenesisInvalid.into());
}
let child_delta = storage.children_default.iter().map(|(_storage_key, child_content)|(
&child_content.child_info,
child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))),
));
let mut changes_trie_config: Option<ChangesTrieConfiguration> = None;
let (root, transaction) = self.old_state.full_storage_root(
storage.top.iter().map(|(k, v)| {
if &k[..] == well_known_keys::CHANGES_TRIE_CONFIG {
changes_trie_config = Some(
Decode::decode(&mut &v[..])
.expect("changes trie configuration is encoded properly at genesis")
);
}
(&k[..], Some(&v[..]))
}),
child_delta
);
self.db_updates = transaction;
self.changes_trie_config_update = Some(changes_trie_config);
self.commit_state = true;
Ok(root)
}
fn update_changes_trie(
&mut self,
update: ChangesTrieTransaction<HashFor<Block>, NumberFor<Block>>,
) -> ClientResult<()> {
self.changes_trie_updates = update.0;
self.changes_trie_build_cache_update = Some(update.1);
Ok(())
}
fn insert_aux<I>(&mut self, ops: I) -> ClientResult<()>
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)>
{
self.aux_ops.append(&mut ops.into_iter().collect());
Ok(())
}
fn update_storage(
&mut self,
update: StorageCollection,
child_update: ChildStorageCollection,
) -> ClientResult<()> {
self.storage_updates = update;
self.child_storage_updates = child_update;
Ok(())
}
fn update_offchain_storage(
&mut self,
offchain_update: OffchainOverlayedChanges,
) -> ClientResult<()> {
self.offchain_storage_updates = offchain_update;
Ok(())
}
fn mark_finalized(
&mut self,
block: BlockId<Block>,
justification: Option<Justification>,
) -> ClientResult<()> {
self.finalized_blocks.push((block, justification));
Ok(())
}
fn mark_head(&mut self, block: BlockId<Block>) -> ClientResult<()> {
assert!(self.set_head.is_none(), "Only one set head per operation is allowed");
self.set_head = Some(block);
Ok(())
}
}
struct StorageDb<Block: BlockT> {
pub db: Arc<dyn Database<DbHash>>,
pub state_db: StateDb<Block::Hash, Vec<u8>>,
prefix_keys: bool,
}
impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for StorageDb<Block> {
fn get(&self, key: &Block::Hash, prefix: Prefix) -> Result<Option<DBValue>, String> {
if self.prefix_keys {
let key = prefixed_key::<HashFor<Block>>(key, prefix);
self.state_db.get(&key, self)
} else {
self.state_db.get(key.as_ref(), self)
}
.map_err(|e| format!("Database backend error: {:?}", e))
}
}
impl<Block: BlockT> sc_state_db::NodeDb for StorageDb<Block> {
type Error = io::Error;
type Key = [u8];
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.db.get(columns::STATE, key))
}
}
struct DbGenesisStorage<Block: BlockT>(pub Block::Hash);
impl<Block: BlockT> DbGenesisStorage<Block> {
pub fn new() -> Self {
let mut root = Block::Hash::default();
let mut mdb = MemoryDB::<HashFor<Block>>::default();
sp_state_machine::TrieDBMut::<HashFor<Block>>::new(&mut mdb, &mut root);
DbGenesisStorage(root)
}
}
impl<Block: BlockT> sp_state_machine::Storage<HashFor<Block>> for DbGenesisStorage<Block> {
fn get(&self, _key: &Block::Hash, _prefix: Prefix) -> Result<Option<DBValue>, String> {
Ok(None)
}
}
struct Frozen<T: Clone> {
at: std::time::Instant,
value: Option<T>,
}
pub(crate) struct FrozenForDuration<T: Clone> {
duration: std::time::Duration,
value: parking_lot::Mutex<Frozen<T>>,
}
impl<T: Clone> FrozenForDuration<T> {
fn new(duration: std::time::Duration) -> Self {
Self {
duration,
value: Frozen { at: std::time::Instant::now(), value: None }.into(),
}
}
fn take_or_else<F>(&self, f: F) -> T where F: FnOnce() -> T {
let mut lock = self.value.lock();
if lock.at.elapsed() > self.duration || lock.value.is_none() {
let new_value = f();
lock.at = std::time::Instant::now();
lock.value = Some(new_value.clone());
new_value
} else {
lock.value.as_ref().expect("checked with lock above").clone()
}
}
}
pub struct Backend<Block: BlockT> {
storage: Arc<StorageDb<Block>>,
offchain_storage: offchain::LocalStorage,
changes_tries_storage: DbChangesTrieStorage<Block>,
blockchain: BlockchainDb<Block>,
canonicalization_delay: u64,
shared_cache: SharedCache<Block>,
import_lock: Arc<RwLock<()>>,
is_archive: bool,
io_stats: FrozenForDuration<(kvdb::IoStats, StateUsageInfo)>,
state_usage: Arc<StateUsageStats>,
}
impl<Block: BlockT> Backend<Block> {
pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> ClientResult<Self> {
let db = crate::utils::open_database::<Block>(&config, DatabaseType::Full)?;
Self::from_database(db as Arc<_>, canonicalization_delay, &config)
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self {
let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
let db = sp_database::as_database(db);
let db_setting = DatabaseSettings {
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
pruning: PruningMode::keep_blocks(keep_blocks),
source: DatabaseSettingsSrc::Custom(db),
};
Self::new(db_setting, canonicalization_delay).expect("failed to create test-db")
}
fn from_database(
db: Arc<dyn Database<DbHash>>,
canonicalization_delay: u64,
config: &DatabaseSettings,
) -> ClientResult<Self> {
let is_archive_pruning = config.pruning.is_archive();
let blockchain = BlockchainDb::new(db.clone())?;
let meta = blockchain.meta.clone();
let map_e = |e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(
format!("State database error: {:?}", e)
);
let state_db: StateDb<_, _> = StateDb::new(
config.pruning.clone(),
!config.source.supports_ref_counting(),
&StateMetaDb(&*db),
).map_err(map_e)?;
let storage_db = StorageDb {
db: db.clone(),
state_db,
prefix_keys: !config.source.supports_ref_counting(),
};
let offchain_storage = offchain::LocalStorage::new(db.clone());
let changes_tries_storage = DbChangesTrieStorage::new(
db,
blockchain.header_metadata_cache.clone(),
columns::META,
columns::CHANGES_TRIE,
columns::KEY_LOOKUP,
columns::HEADER,
columns::CACHE,
meta,
if is_archive_pruning {
None
} else {
Some(MIN_BLOCKS_TO_KEEP_CHANGES_TRIES_FOR)
},
)?;
Ok(Backend {
storage: Arc::new(storage_db),
offchain_storage,
changes_tries_storage,
blockchain,
canonicalization_delay,
shared_cache: new_shared_cache(
config.state_cache_size,
config.state_cache_child_ratio.unwrap_or(DEFAULT_CHILD_RATIO),
),
import_lock: Default::default(),
is_archive: is_archive_pruning,
io_stats: FrozenForDuration::new(std::time::Duration::from_secs(1)),
state_usage: Arc::new(StateUsageStats::new()),
})
}
fn set_head_with_transaction(
&self,
transaction: &mut Transaction<DbHash>,
route_to: Block::Hash,
best_to: (NumberFor<Block>, Block::Hash),
) -> ClientResult<(Vec<Block::Hash>, Vec<Block::Hash>)> {
let mut enacted = Vec::default();
let mut retracted = Vec::default();
let meta = self.blockchain.meta.read();
if meta.best_hash != Default::default() {
let tree_route = sp_blockchain::tree_route(
&self.blockchain,
meta.best_hash,
route_to,
)?;
for r in tree_route.retracted() {
if r.hash == meta.finalized_hash {
warn!(
"Potential safety failure: reverting finalized block {:?}",
(&r.number, &r.hash)
);
return Err(::sp_blockchain::Error::NotInFinalizedChain.into());
}
retracted.push(r.hash.clone());
utils::remove_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
r.number
)?;
}
for e in tree_route.enacted() {
enacted.push(e.hash.clone());
utils::insert_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
e.number,
e.hash
)?;
}
}
let lookup_key = utils::number_and_hash_to_lookup_key(best_to.0, &best_to.1)?;
transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, lookup_key);
utils::insert_number_to_key_mapping(
transaction,
columns::KEY_LOOKUP,
best_to.0,
best_to.1,
)?;
Ok((enacted, retracted))
}
fn ensure_sequential_finalization(
&self,
header: &Block::Header,
last_finalized: Option<Block::Hash>,
) -> ClientResult<()> {
let last_finalized = last_finalized.unwrap_or_else(|| self.blockchain.meta.read().finalized_hash);
if *header.parent_hash() != last_finalized {
return Err(::sp_blockchain::Error::NonSequentialFinalization(
format!("Last finalized {:?} not parent of {:?}", last_finalized, header.hash()),
).into());
}
Ok(())
}
fn finalize_block_with_transaction(
&self,
transaction: &mut Transaction<DbHash>,
hash: &Block::Hash,
header: &Block::Header,
last_finalized: Option<Block::Hash>,
justification: Option<Justification>,
changes_trie_cache_ops: &mut Option<DbChangesTrieStorageTransaction<Block>>,
finalization_displaced: &mut Option<FinalizationDisplaced<Block::Hash, NumberFor<Block>>>,
) -> ClientResult<(Block::Hash, <Block::Header as HeaderT>::Number, bool, bool)> {
let number = *header.number();
self.ensure_sequential_finalization(header, last_finalized)?;
self.note_finalized(
transaction,
false,
header,
*hash,
changes_trie_cache_ops,
finalization_displaced,
)?;
if let Some(justification) = justification {
transaction.set_from_vec(
columns::JUSTIFICATION,
&utils::number_and_hash_to_lookup_key(number, hash)?,
justification.encode(),
);
}
Ok((*hash, number, false, true))
}
fn force_delayed_canonicalize(
&self,
transaction: &mut Transaction<DbHash>,
hash: Block::Hash,
number: NumberFor<Block>,
)
-> ClientResult<()>
{
let number_u64 = number.saturated_into::<u64>();
if number_u64 > self.canonicalization_delay {
let new_canonical = number_u64 - self.canonicalization_delay;
if new_canonical <= self.storage.state_db.best_canonical().unwrap_or(0) {
return Ok(())
}
let hash = if new_canonical == number_u64 {
hash
} else {
::sc_client_api::blockchain::HeaderBackend::hash(&self.blockchain, new_canonical.saturated_into())?
.expect("existence of block with number `new_canonical` \
implies existence of blocks with all numbers before it; qed")
};
trace!(target: "db", "Canonicalize block #{} ({:?})", new_canonical, hash);
let commit = self.storage.state_db.canonicalize_block(&hash)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
};
Ok(())
}
fn try_commit_operation(
&self,
mut operation: BlockImportOperation<Block>,
) -> ClientResult<()> {
let mut transaction = Transaction::new();
let mut finalization_displaced_leaves = None;
operation.apply_aux(&mut transaction);
operation.apply_offchain(&mut transaction);
let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len());
let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
let mut changes_trie_cache_ops = None;
for (block, justification) in operation.finalized_blocks {
let block_hash = self.blockchain.expect_block_hash_from_id(&block)?;
let block_header = self.blockchain.expect_header(BlockId::Hash(block_hash))?;
meta_updates.push(self.finalize_block_with_transaction(
&mut transaction,
&block_hash,
&block_header,
Some(last_finalized_hash),
justification,
&mut changes_trie_cache_ops,
&mut finalization_displaced_leaves,
)?);
last_finalized_hash = block_hash;
}
let imported = if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.hash();
let parent_hash = *pending_block.header.parent_hash();
let number = pending_block.header.number().clone();
let lookup_key = utils::number_and_hash_to_lookup_key(number, hash)?;
let (enacted, retracted) = if pending_block.leaf_state.is_best() {
self.set_head_with_transaction(&mut transaction, parent_hash, (number, hash))?
} else {
(Default::default(), Default::default())
};
utils::insert_hash_to_key_mapping(
&mut transaction,
columns::KEY_LOOKUP,
number,
hash,
)?;
let header_metadata = CachedHeaderMetadata::from(&pending_block.header);
self.blockchain.insert_header_metadata(
header_metadata.hash,
header_metadata,
);
transaction.set_from_vec(columns::HEADER, &lookup_key, pending_block.header.encode());
if let Some(body) = &pending_block.body {
transaction.set_from_vec(columns::BODY, &lookup_key, body.encode());
}
if let Some(justification) = pending_block.justification {
transaction.set_from_vec(columns::JUSTIFICATION, &lookup_key, justification.encode());
}
if number.is_zero() {
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
transaction.set(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
if operation.changes_trie_config_update.is_none() {
operation.changes_trie_config_update = Some(None);
}
}
let finalized = if operation.commit_state {
let mut changeset: sc_state_db::ChangeSet<Vec<u8>> = sc_state_db::ChangeSet::default();
let mut ops: u64 = 0;
let mut bytes: u64 = 0;
let mut removal: u64 = 0;
let mut bytes_removal: u64 = 0;
for (mut key, (val, rc)) in operation.db_updates.drain() {
if !self.storage.prefix_keys {
key.drain(0 .. key.len() - DB_HASH_LEN);
};
if rc > 0 {
ops += 1;
bytes += key.len() as u64 + val.len() as u64;
if rc == 1 {
changeset.inserted.push((key, val.to_vec()));
} else {
changeset.inserted.push((key.clone(), val.to_vec()));
for _ in 0 .. rc - 1 {
changeset.inserted.push((key.clone(), Default::default()));
}
}
} else if rc < 0 {
removal += 1;
bytes_removal += key.len() as u64;
if rc == -1 {
changeset.deleted.push(key);
} else {
for _ in 0 .. -rc {
changeset.deleted.push(key.clone());
}
}
}
}
self.state_usage.tally_writes_nodes(ops, bytes);
self.state_usage.tally_removed_nodes(removal, bytes_removal);
let mut ops: u64 = 0;
let mut bytes: u64 = 0;
for (key, value) in operation.storage_updates.iter()
.chain(operation.child_storage_updates.iter().flat_map(|(_, s)| s.iter())) {
ops += 1;
bytes += key.len() as u64;
if let Some(v) = value.as_ref() {
bytes += v.len() as u64;
}
}
self.state_usage.tally_writes(ops, bytes);
let number_u64 = number.saturated_into::<u64>();
let commit = self.storage.state_db.insert_block(
&hash,
number_u64,
&pending_block.header.parent_hash(),
changeset,
).map_err(|e: sc_state_db::Error<io::Error>|
sp_blockchain::Error::from(format!("State database error: {:?}", e))
)?;
apply_state_commit(&mut transaction, commit);
let finalized = number_u64 == 0 || pending_block.leaf_state.is_final();
finalized
} else {
false
};
let header = &pending_block.header;
let is_best = pending_block.leaf_state.is_best();
let changes_trie_updates = operation.changes_trie_updates;
let changes_trie_config_update = operation.changes_trie_config_update;
changes_trie_cache_ops = Some(self.changes_tries_storage.commit(
&mut transaction,
changes_trie_updates,
cache::ComplexBlockId::new(
*header.parent_hash(),
if number.is_zero() { Zero::zero() } else { number - One::one() },
),
cache::ComplexBlockId::new(hash, number),
header,
finalized,
changes_trie_config_update,
changes_trie_cache_ops,
)?);
self.state_usage.merge_sm(operation.old_state.usage_info());
let cache = operation.old_state.into_cache_changes();
if finalized {
self.ensure_sequential_finalization(header, Some(last_finalized_hash))?;
self.note_finalized(
&mut transaction,
true,
header,
hash,
&mut changes_trie_cache_ops,
&mut finalization_displaced_leaves,
)?;
} else {
self.force_delayed_canonicalize(&mut transaction, hash, *header.number())?
}
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, is_best);
let displaced_leaf = {
let mut leaves = self.blockchain.leaves.write();
let displaced_leaf = leaves.import(hash, number, parent_hash);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
displaced_leaf
};
let mut children = children::read_children(
&*self.storage.db,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
)?;
children.push(hash);
children::write_children(
&mut transaction,
columns::META,
meta_keys::CHILDREN_PREFIX,
parent_hash,
children,
);
meta_updates.push((hash, number, pending_block.leaf_state.is_best(), finalized));
Some((number, hash, enacted, retracted, displaced_leaf, is_best, cache))
} else {
None
};
let cache_update = if let Some(set_head) = operation.set_head {
if let Some(header) = sc_client_api::blockchain::HeaderBackend::header(&self.blockchain, set_head)? {
let number = header.number();
let hash = header.hash();
let (enacted, retracted) = self.set_head_with_transaction(
&mut transaction,
hash.clone(),
(number.clone(), hash.clone())
)?;
meta_updates.push((hash, *number, true, false));
Some((enacted, retracted))
} else {
return Err(sp_blockchain::Error::UnknownBlock(format!("Cannot set head {:?}", set_head)))
}
} else {
None
};
self.storage.db.commit(transaction)?;
if let Some((
number,
hash,
enacted,
retracted,
_displaced_leaf,
is_best,
mut cache,
)) = imported {
cache.sync_cache(
&enacted,
&retracted,
operation.storage_updates,
operation.child_storage_updates,
Some(hash),
Some(number),
is_best,
);
}
if let Some(changes_trie_build_cache_update) = operation.changes_trie_build_cache_update {
self.changes_tries_storage.commit_build_cache(changes_trie_build_cache_update);
}
self.changes_tries_storage.post_commit(changes_trie_cache_ops);
if let Some((enacted, retracted)) = cache_update {
self.shared_cache.lock().sync(&enacted, &retracted);
}
for (hash, number, is_best, is_finalized) in meta_updates {
self.blockchain.update_meta(hash, number, is_best, is_finalized);
}
Ok(())
}
fn note_finalized(
&self,
transaction: &mut Transaction<DbHash>,
is_inserted: bool,
f_header: &Block::Header,
f_hash: Block::Hash,
changes_trie_cache_ops: &mut Option<DbChangesTrieStorageTransaction<Block>>,
displaced: &mut Option<FinalizationDisplaced<Block::Hash, NumberFor<Block>>>
) -> ClientResult<()> {
let f_num = f_header.number().clone();
if self.storage.state_db.best_canonical().map(|c| f_num.saturated_into::<u64>() > c).unwrap_or(true) {
let lookup_key = utils::number_and_hash_to_lookup_key(f_num, f_hash.clone())?;
transaction.set_from_vec(columns::META, meta_keys::FINALIZED_BLOCK, lookup_key);
let commit = self.storage.state_db.canonicalize_block(&f_hash)
.map_err(|e: sc_state_db::Error<io::Error>| sp_blockchain::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(transaction, commit);
if !f_num.is_zero() {
let new_changes_trie_cache_ops = self.changes_tries_storage.finalize(
transaction,
*f_header.parent_hash(),
f_hash,
f_num,
if is_inserted { Some(&f_header) } else { None },
changes_trie_cache_ops.take(),
)?;
*changes_trie_cache_ops = Some(new_changes_trie_cache_ops);
}
}
let new_displaced = self.blockchain.leaves.write().finalize_height(f_num);
match displaced {
x @ &mut None => *x = Some(new_displaced),
&mut Some(ref mut displaced) => displaced.merge(new_displaced),
}
Ok(())
}
}
fn apply_state_commit(transaction: &mut Transaction<DbHash>, commit: sc_state_db::CommitSet<Vec<u8>>) {
for (key, val) in commit.data.inserted.into_iter() {
transaction.set_from_vec(columns::STATE, &key[..], val);
}
for key in commit.data.deleted.into_iter() {
transaction.remove(columns::STATE, &key[..]);
}
for (key, val) in commit.meta.inserted.into_iter() {
transaction.set_from_vec(columns::STATE_META, &key[..], val);
}
for key in commit.meta.deleted.into_iter() {
transaction.remove(columns::STATE_META, &key[..]);
}
}
impl<Block> sc_client_api::backend::AuxStore for Backend<Block> where Block: BlockT {
fn insert_aux<
'a,
'b: 'a,
'c: 'a,
I: IntoIterator<Item=&'a(&'c [u8], &'c [u8])>,
D: IntoIterator<Item=&'a &'b [u8]>,
>(&self, insert: I, delete: D) -> ClientResult<()> {
let mut transaction = Transaction::new();
for (k, v) in insert {
transaction.set(columns::AUX, k, v);
}
for k in delete {
transaction.remove(columns::AUX, k);
}
self.storage.db.commit(transaction)?;
Ok(())
}
fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
Ok(self.storage.db.get(columns::AUX, key))
}
}
impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
type BlockImportOperation = BlockImportOperation<Block>;
type Blockchain = BlockchainDb<Block>;
type State = SyncingCachingState<RefTrackingState<Block>, Block>;
type OffchainStorage = offchain::LocalStorage;
fn begin_operation(&self) -> ClientResult<Self::BlockImportOperation> {
let mut old_state = self.state_at(BlockId::Hash(Default::default()))?;
old_state.disable_syncing();
Ok(BlockImportOperation {
pending_block: None,
old_state,
db_updates: PrefixedMemoryDB::default(),
storage_updates: Default::default(),
child_storage_updates: Default::default(),
offchain_storage_updates: Default::default(),
changes_trie_config_update: None,
changes_trie_updates: MemoryDB::default(),
changes_trie_build_cache_update: None,
aux_ops: Vec::new(),
finalized_blocks: Vec::new(),
set_head: None,
commit_state: false,
})
}
fn begin_state_operation(
&self,
operation: &mut Self::BlockImportOperation,
block: BlockId<Block>,
) -> ClientResult<()> {
operation.old_state = self.state_at(block)?;
operation.old_state.disable_syncing();
operation.commit_state = true;
Ok(())
}
fn commit_operation(
&self,
operation: Self::BlockImportOperation,
) -> ClientResult<()> {
let usage = operation.old_state.usage_info();
self.state_usage.merge_sm(usage);
match self.try_commit_operation(operation) {
Ok(_) => {
self.storage.state_db.apply_pending();
Ok(())
},
e @ Err(_) => {
self.storage.state_db.revert_pending();
e
}
}
}
fn finalize_block(
&self,
block: BlockId<Block>,
justification: Option<Justification>,
) -> ClientResult<()> {
let mut transaction = Transaction::new();
let hash = self.blockchain.expect_block_hash_from_id(&block)?;
let header = self.blockchain.expect_header(block)?;
let mut displaced = None;
let mut changes_trie_cache_ops = None;
let (hash, number, is_best, is_finalized) = self.finalize_block_with_transaction(
&mut transaction,
&hash,
&header,
None,
justification,
&mut changes_trie_cache_ops,
&mut displaced,
)?;
self.storage.db.commit(transaction)?;
self.blockchain.update_meta(hash, number, is_best, is_finalized);
self.changes_tries_storage.post_commit(changes_trie_cache_ops);
Ok(())
}
fn changes_trie_storage(&self) -> Option<&dyn PrunableStateChangesTrieStorage<Block>> {
Some(&self.changes_tries_storage)
}
fn offchain_storage(&self) -> Option<Self::OffchainStorage> {
Some(self.offchain_storage.clone())
}
fn usage_info(&self) -> Option<UsageInfo> {
let (io_stats, state_stats) = self.io_stats.take_or_else(||
(
kvdb::IoStats::empty(),
self.state_usage.take(),
)
);
let database_cache = MemorySize::from_bytes(0);
let state_cache = MemorySize::from_bytes(
(*&self.shared_cache).lock().used_storage_cache_size(),
);
let state_db = self.storage.state_db.memory_info();
Some(UsageInfo {
memory: MemoryInfo {
state_cache,
database_cache,
state_db,
},
io: IoInfo {
transactions: io_stats.transactions,
bytes_read: io_stats.bytes_read,
bytes_written: io_stats.bytes_written,
writes: io_stats.writes,
reads: io_stats.reads,
average_transaction_size: io_stats.avg_transaction_size() as u64,
state_reads: state_stats.reads.ops,
state_writes: state_stats.writes.ops,
state_writes_cache: state_stats.overlay_writes.ops,
state_reads_cache: state_stats.cache_reads.ops,
state_writes_nodes: state_stats.nodes_writes.ops,
},
})
}
fn revert(
&self,
n: NumberFor<Block>,
revert_finalized: bool,
) -> ClientResult<(NumberFor<Block>, HashSet<Block::Hash>)> {
let mut reverted_finalized = HashSet::new();
let mut best_number = self.blockchain.info().best_number;
let mut best_hash = self.blockchain.info().best_hash;
let finalized = self.blockchain.info().finalized_number;
let revertible = best_number - finalized;
let n = if !revert_finalized && revertible < n {
revertible
} else {
n
};
let mut revert_blocks = || -> ClientResult<NumberFor<Block>> {
for c in 0 .. n.saturated_into::<u64>() {
if best_number.is_zero() {
return Ok(c.saturated_into::<NumberFor<Block>>())
}
let mut transaction = Transaction::new();
let removed_number = best_number;
let removed = self.blockchain.header(BlockId::Number(best_number))?.ok_or_else(
|| sp_blockchain::Error::UnknownBlock(
format!("Error reverting to {}. Block hash not found.", best_number)))?;
let removed_hash = removed.hash();
let prev_number = best_number.saturating_sub(One::one());
let prev_hash = self.blockchain.hash(prev_number)?.ok_or_else(
|| sp_blockchain::Error::UnknownBlock(
format!("Error reverting to {}. Block hash not found.", best_number))
)?;
if !self.have_state_at(&prev_hash, prev_number) {
return Ok(c.saturated_into::<NumberFor<Block>>())
}
match self.storage.state_db.revert_one() {
Some(commit) => {
apply_state_commit(&mut transaction, commit);
best_number = prev_number;
best_hash = prev_hash;
let update_finalized = best_number < finalized;
let key = utils::number_and_hash_to_lookup_key(best_number.clone(), &best_hash)?;
let changes_trie_cache_ops = self.changes_tries_storage.revert(
&mut transaction,
&cache::ComplexBlockId::new(
removed.hash(),
removed_number,
),
)?;
if update_finalized {
transaction.set_from_vec(
columns::META,
meta_keys::FINALIZED_BLOCK,
key.clone()
);
reverted_finalized.insert(removed_hash);
}
transaction.set_from_vec(columns::META, meta_keys::BEST_BLOCK, key);
transaction.remove(columns::KEY_LOOKUP, removed.hash().as_ref());
children::remove_children(&mut transaction, columns::META, meta_keys::CHILDREN_PREFIX, best_hash);
self.storage.db.commit(transaction)?;
self.changes_tries_storage.post_commit(Some(changes_trie_cache_ops));
self.blockchain.update_meta(best_hash, best_number, true, update_finalized);
}
None => return Ok(c.saturated_into::<NumberFor<Block>>())
}
}
Ok(n)
};
let reverted = revert_blocks()?;
let revert_leaves = || -> ClientResult<()> {
let mut transaction = Transaction::new();
let mut leaves = self.blockchain.leaves.write();
leaves.revert(best_hash, best_number);
leaves.prepare_transaction(&mut transaction, columns::META, meta_keys::LEAF_PREFIX);
self.storage.db.commit(transaction)?;
Ok(())
};
revert_leaves()?;
Ok((reverted, reverted_finalized))
}
fn blockchain(&self) -> &BlockchainDb<Block> {
&self.blockchain
}
fn state_at(&self, block: BlockId<Block>) -> ClientResult<Self::State> {
use sc_client_api::blockchain::HeaderBackend as BcHeaderBackend;
match block {
BlockId::Hash(h) if h == Default::default() => {
let genesis_storage = DbGenesisStorage::<Block>::new();
let root = genesis_storage.0.clone();
let db_state = DbState::<Block>::new(Arc::new(genesis_storage), root);
let state = RefTrackingState::new(db_state, self.storage.clone(), None);
let caching_state = CachingState::new(
state,
self.shared_cache.clone(),
None,
);
return Ok(SyncingCachingState::new(
caching_state,
self.state_usage.clone(),
self.blockchain.meta.clone(),
self.import_lock.clone(),
));
},
_ => {}
}
let hash = match block {
BlockId::Hash(h) => h,
BlockId::Number(n) => self.blockchain.hash(n)?.ok_or_else(||
sp_blockchain::Error::UnknownBlock(format!("Unknown block number {}", n))
)?,
};
match self.blockchain.header_metadata(hash) {
Ok(ref hdr) => {
if !self.have_state_at(&hash, hdr.number) {
return Err(
sp_blockchain::Error::UnknownBlock(
format!("State already discarded for {:?}", block)
)
)
}
if let Ok(()) = self.storage.state_db.pin(&hash) {
let root = hdr.state_root;
let db_state = DbState::<Block>::new(self.storage.clone(), root);
let state = RefTrackingState::new(
db_state,
self.storage.clone(),
Some(hash.clone()),
);
let caching_state = CachingState::new(
state,
self.shared_cache.clone(),
Some(hash),
);
Ok(SyncingCachingState::new(
caching_state,
self.state_usage.clone(),
self.blockchain.meta.clone(),
self.import_lock.clone(),
))
} else {
Err(
sp_blockchain::Error::UnknownBlock(
format!("State already discarded for {:?}", block)
)
)
}
},
Err(e) => Err(e),
}
}
fn have_state_at(&self, hash: &Block::Hash, number: NumberFor<Block>) -> bool {
if self.is_archive {
match self.blockchain.header_metadata(hash.clone()) {
Ok(header) => {
sp_state_machine::Storage::get(
self.storage.as_ref(),
&header.state_root,
(&[], None),
).unwrap_or(None).is_some()
},
_ => false,
}
} else {
!self.storage.state_db.is_pruned(hash, number.saturated_into::<u64>())
}
}
fn get_import_lock(&self) -> &RwLock<()> {
&*self.import_lock
}
}
impl<Block: BlockT> sc_client_api::backend::LocalBackend<Block> for Backend<Block> {}
#[cfg(test)]
pub(crate) mod tests {
use hash_db::{HashDB, EMPTY_PREFIX};
use super::*;
use crate::columns;
use sp_core::H256;
use sc_client_api::backend::{Backend as BTrait, BlockImportOperation as Op};
use sc_client_api::blockchain::Backend as BLBTrait;
use sp_runtime::testing::{Header, Block as RawBlock, ExtrinsicWrapper};
use sp_runtime::traits::{Hash, BlakeTwo256};
use sp_runtime::generic::DigestItem;
use sp_state_machine::{TrieMut, TrieDBMut};
use sp_blockchain::{lowest_common_ancestor, tree_route};
pub(crate) type Block = RawBlock<ExtrinsicWrapper<u64>>;
pub fn prepare_changes(changes: Vec<(Vec<u8>, Vec<u8>)>) -> (H256, MemoryDB<BlakeTwo256>) {
let mut changes_root = H256::default();
let mut changes_trie_update = MemoryDB::<BlakeTwo256>::default();
{
let mut trie = TrieDBMut::<BlakeTwo256>::new(
&mut changes_trie_update,
&mut changes_root
);
for (key, value) in changes {
trie.insert(&key, &value).unwrap();
}
}
(changes_root, changes_trie_update)
}
pub fn insert_header(
backend: &Backend<Block>,
number: u64,
parent_hash: H256,
changes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
extrinsics_root: H256,
) -> H256 {
use sp_runtime::testing::Digest;
let mut digest = Digest::default();
let mut changes_trie_update = Default::default();
if let Some(changes) = changes {
let (root, update) = prepare_changes(changes);
digest.push(DigestItem::ChangesTrieRoot(root));
changes_trie_update = update;
}
let header = Header {
number,
parent_hash,
state_root: BlakeTwo256::trie_root(Vec::new()),
digest,
extrinsics_root,
};
let header_hash = header.hash();
let block_id = if number == 0 {
BlockId::Hash(Default::default())
} else {
BlockId::Number(number - 1)
};
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, block_id).unwrap();
op.set_block_data(header, Some(Vec::new()), None, NewBlockState::Best).unwrap();
op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap();
backend.commit_operation(op).unwrap();
header_hash
}
#[test]
fn block_hash_inserted_correctly() {
let backing = {
let db = Backend::<Block>::new_test(1, 0);
for i in 0..10 {
assert!(db.blockchain().hash(i).unwrap().is_none());
{
let id = if i == 0 {
BlockId::Hash(Default::default())
} else {
BlockId::Number(i - 1)
};
let mut op = db.begin_operation().unwrap();
db.begin_state_operation(&mut op, id).unwrap();
let header = Header {
number: i,
parent_hash: if i == 0 {
Default::default()
} else {
db.blockchain.hash(i - 1).unwrap().unwrap()
},
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
op.set_block_data(
header,
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
db.commit_operation(op).unwrap();
}
assert!(db.blockchain().hash(i).unwrap().is_some())
}
db.storage.db.clone()
};
let backend = Backend::<Block>::new(DatabaseSettings {
state_cache_size: 16777216,
state_cache_child_ratio: Some((50, 100)),
pruning: PruningMode::keep_blocks(1),
source: DatabaseSettingsSrc::Custom(backing),
}, 0).unwrap();
assert_eq!(backend.blockchain().info().best_number, 9);
for i in 0..10 {
assert!(backend.blockchain().hash(i).unwrap().is_some())
}
}
#[test]
fn set_state_data() {
let db = Backend::<Block>::new_test(2, 0);
let hash = {
let mut op = db.begin_operation().unwrap();
db.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap();
let mut header = Header {
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage = vec![
(vec![1, 3, 5], vec![2, 4, 6]),
(vec![1, 2, 3], vec![9, 9, 9]),
];
header.state_root = op.old_state.storage_root(storage
.iter()
.map(|(x, y)| (&x[..], Some(&y[..])))
).0.into();
let hash = header.hash();
op.reset_storage(Storage {
top: storage.into_iter().collect(),
children_default: Default::default(),
}).unwrap();
op.set_block_data(
header.clone(),
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
db.commit_operation(op).unwrap();
let state = db.state_at(BlockId::Number(0)).unwrap();
assert_eq!(state.storage(&[1, 3, 5]).unwrap(), Some(vec![2, 4, 6]));
assert_eq!(state.storage(&[1, 2, 3]).unwrap(), Some(vec![9, 9, 9]));
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), None);
hash
};
{
let mut op = db.begin_operation().unwrap();
db.begin_state_operation(&mut op, BlockId::Number(0)).unwrap();
let mut header = Header {
number: 1,
parent_hash: hash,
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage = vec![
(vec![1, 3, 5], None),
(vec![5, 5, 5], Some(vec![4, 5, 6])),
];
let (root, overlay) = op.old_state.storage_root(
storage.iter()
.map(|(k, v)| (&k[..], v.as_ref().map(|v| &v[..])))
);
op.update_db_storage(overlay).unwrap();
header.state_root = root.into();
op.update_storage(storage, Vec::new()).unwrap();
op.set_block_data(
header,
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
db.commit_operation(op).unwrap();
let state = db.state_at(BlockId::Number(1)).unwrap();
assert_eq!(state.storage(&[1, 3, 5]).unwrap(), None);
assert_eq!(state.storage(&[1, 2, 3]).unwrap(), Some(vec![9, 9, 9]));
assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6]));
}
}
#[test]
fn delete_only_when_negative_rc() {
sp_tracing::try_init_simple();
let key;
let backend = Backend::<Block>::new_test(1, 0);
let hash = {
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(Default::default())).unwrap();
let mut header = Header {
number: 0,
parent_hash: Default::default(),
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
header.state_root = op.old_state.storage_root(std::iter::empty()).0.into();
let hash = header.hash();
op.reset_storage(Storage {
top: Default::default(),
children_default: Default::default(),
}).unwrap();
key = op.db_updates.insert(EMPTY_PREFIX, b"hello");
op.set_block_data(
header,
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
backend.commit_operation(op).unwrap();
assert_eq!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap(), &b"hello"[..]);
hash
};
let hash = {
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Number(0)).unwrap();
let mut header = Header {
number: 1,
parent_hash: hash,
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
let hash = header.hash();
op.db_updates.insert(EMPTY_PREFIX, b"hello");
op.db_updates.remove(&key, EMPTY_PREFIX);
op.set_block_data(
header,
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
backend.commit_operation(op).unwrap();
assert_eq!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).unwrap(), &b"hello"[..]);
hash
};
let hash = {
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Number(1)).unwrap();
let mut header = Header {
number: 2,
parent_hash: hash,
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
let hash = header.hash();
op.db_updates.remove(&key, EMPTY_PREFIX);
op.set_block_data(
header,
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
backend.commit_operation(op).unwrap();
assert!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).is_some());
hash
};
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Number(2)).unwrap();
let mut header = Header {
number: 3,
parent_hash: hash,
state_root: Default::default(),
digest: Default::default(),
extrinsics_root: Default::default(),
};
let storage: Vec<(_, _)> = vec![];
header.state_root = op.old_state.storage_root(storage
.iter()
.cloned()
.map(|(x, y)| (x, Some(y)))
).0.into();
op.set_block_data(
header,
Some(vec![]),
None,
NewBlockState::Best,
).unwrap();
backend.commit_operation(op).unwrap();
assert!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).is_none());
}
backend.finalize_block(BlockId::Number(1), None).unwrap();
backend.finalize_block(BlockId::Number(2), None).unwrap();
backend.finalize_block(BlockId::Number(3), None).unwrap();
assert!(backend.storage.db.get(
columns::STATE,
&sp_trie::prefixed_key::<BlakeTwo256>(&key, EMPTY_PREFIX)
).is_none());
}
#[test]
fn tree_route_works() {
let backend = Backend::<Block>::new_test(1000, 100);
let blockchain = backend.blockchain();
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let a1 = insert_header(&backend, 1, block0, None, Default::default());
let a2 = insert_header(&backend, 2, a1, None, Default::default());
let a3 = insert_header(&backend, 3, a2, None, Default::default());
let b1 = insert_header(&backend, 1, block0, None, H256::from([1; 32]));
let b2 = insert_header(&backend, 2, b1, None, Default::default());
{
let tree_route = tree_route(blockchain, a3, b2).unwrap();
assert_eq!(tree_route.common_block().hash, block0);
assert_eq!(tree_route.retracted().iter().map(|r| r.hash).collect::<Vec<_>>(), vec![a3, a2, a1]);
assert_eq!(tree_route.enacted().iter().map(|r| r.hash).collect::<Vec<_>>(), vec![b1, b2]);
}
{
let tree_route = tree_route(blockchain, a1, a3).unwrap();
assert_eq!(tree_route.common_block().hash, a1);
assert!(tree_route.retracted().is_empty());
assert_eq!(tree_route.enacted().iter().map(|r| r.hash).collect::<Vec<_>>(), vec![a2, a3]);
}
{
let tree_route = tree_route(blockchain, a3, a1).unwrap();
assert_eq!(tree_route.common_block().hash, a1);
assert_eq!(tree_route.retracted().iter().map(|r| r.hash).collect::<Vec<_>>(), vec![a3, a2]);
assert!(tree_route.enacted().is_empty());
}
{
let tree_route = tree_route(blockchain, a2, a2).unwrap();
assert_eq!(tree_route.common_block().hash, a2);
assert!(tree_route.retracted().is_empty());
assert!(tree_route.enacted().is_empty());
}
}
#[test]
fn tree_route_child() {
let backend = Backend::<Block>::new_test(1000, 100);
let blockchain = backend.blockchain();
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let block1 = insert_header(&backend, 1, block0, None, Default::default());
{
let tree_route = tree_route(blockchain, block0, block1).unwrap();
assert_eq!(tree_route.common_block().hash, block0);
assert!(tree_route.retracted().is_empty());
assert_eq!(tree_route.enacted().iter().map(|r| r.hash).collect::<Vec<_>>(), vec![block1]);
}
}
#[test]
fn lowest_common_ancestor_works() {
let backend = Backend::<Block>::new_test(1000, 100);
let blockchain = backend.blockchain();
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let a1 = insert_header(&backend, 1, block0, None, Default::default());
let a2 = insert_header(&backend, 2, a1, None, Default::default());
let a3 = insert_header(&backend, 3, a2, None, Default::default());
let b1 = insert_header(&backend, 1, block0, None, H256::from([1; 32]));
let b2 = insert_header(&backend, 2, b1, None, Default::default());
{
let lca = lowest_common_ancestor(blockchain, a3, b2).unwrap();
assert_eq!(lca.hash, block0);
assert_eq!(lca.number, 0);
}
{
let lca = lowest_common_ancestor(blockchain, a1, a3).unwrap();
assert_eq!(lca.hash, a1);
assert_eq!(lca.number, 1);
}
{
let lca = lowest_common_ancestor(blockchain, a3, a1).unwrap();
assert_eq!(lca.hash, a1);
assert_eq!(lca.number, 1);
}
{
let lca = lowest_common_ancestor(blockchain, a2, a3).unwrap();
assert_eq!(lca.hash, a2);
assert_eq!(lca.number, 2);
}
{
let lca = lowest_common_ancestor(blockchain, a2, a1).unwrap();
assert_eq!(lca.hash, a1);
assert_eq!(lca.number, 1);
}
{
let lca = lowest_common_ancestor(blockchain, a2, a2).unwrap();
assert_eq!(lca.hash, a2);
assert_eq!(lca.number, 2);
}
}
#[test]
fn test_tree_route_regression() {
let backend = Backend::<Block>::new_test(10000, 10000);
let blockchain = backend.blockchain();
let genesis = insert_header(&backend, 0, Default::default(), None, Default::default());
let block100 = (1..=100).fold(genesis, |parent, n| {
insert_header(&backend, n, parent, None, Default::default())
});
let block7000 = (101..=7000).fold(block100, |parent, n| {
insert_header(&backend, n, parent, None, Default::default())
});
lowest_common_ancestor(blockchain, genesis, block100).unwrap();
let tree_route = tree_route(blockchain, block100, block7000).unwrap();
assert!(tree_route.retracted().is_empty());
}
#[test]
fn test_leaves_with_complex_block_tree() {
let backend: Arc<Backend<substrate_test_runtime_client::runtime::Block>> = Arc::new(Backend::new_test(20, 20));
substrate_test_runtime_client::trait_tests::test_leaves_for_backend(backend);
}
#[test]
fn test_children_with_complex_block_tree() {
let backend: Arc<Backend<substrate_test_runtime_client::runtime::Block>> = Arc::new(Backend::new_test(20, 20));
substrate_test_runtime_client::trait_tests::test_children_for_backend(backend);
}
#[test]
fn test_blockchain_query_by_number_gets_canonical() {
let backend: Arc<Backend<substrate_test_runtime_client::runtime::Block>> = Arc::new(Backend::new_test(20, 20));
substrate_test_runtime_client::trait_tests::test_blockchain_query_by_number_gets_canonical(backend);
}
#[test]
fn test_leaves_pruned_on_finality() {
let backend: Backend<Block> = Backend::new_test(10, 10);
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let block1_a = insert_header(&backend, 1, block0, None, Default::default());
let block1_b = insert_header(&backend, 1, block0, None, [1; 32].into());
let block1_c = insert_header(&backend, 1, block0, None, [2; 32].into());
assert_eq!(backend.blockchain().leaves().unwrap(), vec![block1_a, block1_b, block1_c]);
let block2_a = insert_header(&backend, 2, block1_a, None, Default::default());
let block2_b = insert_header(&backend, 2, block1_b, None, Default::default());
let block2_c = insert_header(&backend, 2, block1_b, None, [1; 32].into());
assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2_a, block2_b, block2_c, block1_c]);
backend.finalize_block(BlockId::hash(block1_a), None).unwrap();
backend.finalize_block(BlockId::hash(block2_a), None).unwrap();
assert_eq!(backend.blockchain().leaves().unwrap(), vec![block2_a, block2_b, block2_c]);
}
#[test]
fn test_aux() {
let backend: Backend<substrate_test_runtime_client::runtime::Block> = Backend::new_test(0, 0);
assert!(backend.get_aux(b"test").unwrap().is_none());
backend.insert_aux(&[(&b"test"[..], &b"hello"[..])], &[]).unwrap();
assert_eq!(b"hello", &backend.get_aux(b"test").unwrap().unwrap()[..]);
backend.insert_aux(&[], &[&b"test"[..]]).unwrap();
assert!(backend.get_aux(b"test").unwrap().is_none());
}
#[test]
fn test_finalize_block_with_justification() {
use sc_client_api::blockchain::{Backend as BlockChainBackend};
let backend = Backend::<Block>::new_test(10, 10);
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let _ = insert_header(&backend, 1, block0, None, Default::default());
let justification = Some(vec![1, 2, 3]);
backend.finalize_block(BlockId::Number(1), justification.clone()).unwrap();
assert_eq!(
backend.blockchain().justification(BlockId::Number(1)).unwrap(),
justification,
);
}
#[test]
fn test_finalize_multiple_blocks_in_single_op() {
let backend = Backend::<Block>::new_test(10, 10);
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let block1 = insert_header(&backend, 1, block0, None, Default::default());
let block2 = insert_header(&backend, 2, block1, None, Default::default());
let block3 = insert_header(&backend, 3, block2, None, Default::default());
let block4 = insert_header(&backend, 4, block3, None, Default::default());
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(block0)).unwrap();
op.mark_finalized(BlockId::Hash(block1), None).unwrap();
op.mark_finalized(BlockId::Hash(block2), None).unwrap();
backend.commit_operation(op).unwrap();
}
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(block2)).unwrap();
op.mark_finalized(BlockId::Hash(block3), None).unwrap();
op.mark_finalized(BlockId::Hash(block4), None).unwrap();
backend.commit_operation(op).unwrap();
}
}
#[test]
fn test_finalize_non_sequential() {
let backend = Backend::<Block>::new_test(10, 10);
let block0 = insert_header(&backend, 0, Default::default(), None, Default::default());
let block1 = insert_header(&backend, 1, block0, None, Default::default());
let block2 = insert_header(&backend, 2, block1, None, Default::default());
{
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, BlockId::Hash(block0)).unwrap();
op.mark_finalized(BlockId::Hash(block2), None).unwrap();
backend.commit_operation(op).unwrap_err();
}
}
#[test]
fn header_cht_root_works() {
use sc_client_api::ProvideChtRoots;
let backend = Backend::<Block>::new_test(10, 10);
let mut prev_hash = insert_header(&backend, 0, Default::default(), None, Default::default());
let cht_size: u64 = cht::size();
for i in 1..1 + cht_size + cht_size + 1 {
prev_hash = insert_header(&backend, i, prev_hash, None, Default::default());
}
let blockchain = backend.blockchain();
let cht_root_1 = blockchain.header_cht_root(cht_size, cht::start_number(cht_size, 0))
.unwrap().unwrap();
let cht_root_2 = blockchain.header_cht_root(cht_size, cht::start_number(cht_size, 0) + cht_size / 2)
.unwrap().unwrap();
let cht_root_3 = blockchain.header_cht_root(cht_size, cht::end_number(cht_size, 0))
.unwrap().unwrap();
assert_eq!(cht_root_1, cht_root_2);
assert_eq!(cht_root_2, cht_root_3);
}
}