use std::collections::HashMap;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as _, NumberFor}};
use crate::{
error::Error as ConsensusError,
block_import::{
BlockImport, BlockOrigin, BlockImportParams, ImportedAux, JustificationImport, ImportResult,
BlockCheckParams, FinalityProofImport,
},
metrics::Metrics,
};
pub use basic_queue::BasicQueue;
pub type DefaultImportQueue<Block, Client> = BasicQueue<Block, sp_api::TransactionFor<Client, Block>>;
mod basic_queue;
pub mod buffered_link;
pub type BoxBlockImport<B, Transaction> = Box<
dyn BlockImport<B, Error = ConsensusError, Transaction = Transaction> + Send + Sync
>;
pub type BoxJustificationImport<B> = Box<dyn JustificationImport<B, Error=ConsensusError> + Send + Sync>;
pub type BoxFinalityProofImport<B> = Box<
dyn FinalityProofImport<B, Error = ConsensusError> + Send + Sync
>;
pub type Origin = libp2p::PeerId;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct IncomingBlock<B: BlockT> {
pub hash: <B as BlockT>::Hash,
pub header: Option<<B as BlockT>::Header>,
pub body: Option<Vec<<B as BlockT>::Extrinsic>>,
pub justification: Option<Justification>,
pub origin: Option<Origin>,
pub allow_missing_state: bool,
pub import_existing: bool,
}
pub type CacheKeyId = [u8; 4];
pub trait Verifier<B: BlockT>: Send + Sync {
fn verify(
&mut self,
origin: BlockOrigin,
header: B::Header,
justification: Option<Justification>,
body: Option<Vec<B::Extrinsic>>,
) -> Result<(BlockImportParams<B, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String>;
}
pub trait ImportQueue<B: BlockT>: Send {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
fn import_justification(
&mut self,
who: Origin,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification
);
fn import_finality_proof(
&mut self,
who: Origin,
hash: B::Hash,
number: NumberFor<B>,
finality_proof: Vec<u8>
);
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);
}
pub trait Link<B: BlockT>: Send {
fn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
_results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {}
fn justification_imported(&mut self, _who: Origin, _hash: &B::Hash, _number: NumberFor<B>, _success: bool) {}
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
fn finality_proof_imported(
&mut self,
_who: Origin,
_request_block: (B::Hash, NumberFor<B>),
_finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {}
fn request_finality_proof(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
}
#[derive(Debug, PartialEq)]
pub enum BlockImportResult<N: ::std::fmt::Debug + PartialEq> {
ImportedKnown(N),
ImportedUnknown(N, ImportedAux, Option<Origin>),
}
#[derive(Debug)]
pub enum BlockImportError {
IncompleteHeader(Option<Origin>),
VerificationFailed(Option<Origin>, String),
BadBlock(Option<Origin>),
MissingState,
UnknownParent,
Cancelled,
Other(ConsensusError),
}
pub fn import_single_block<B: BlockT, V: Verifier<B>, Transaction>(
import_handle: &mut dyn BlockImport<B, Transaction = Transaction, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
import_single_block_metered(import_handle, block_origin, block, verifier, None)
}
pub(crate) fn import_single_block_metered<B: BlockT, V: Verifier<B>, Transaction>(
import_handle: &mut dyn BlockImport<B, Transaction = Transaction, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &mut V,
metrics: Option<Metrics>,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
let peer = block.origin;
let (header, justification) = match (block.header, block.justification) {
(Some(header), justification) => (header, justification),
(None, _) => {
if let Some(ref peer) = peer {
debug!(target: "sync", "Header {} was not provided by {} ", block.hash, peer);
} else {
debug!(target: "sync", "Header {} was not provided ", block.hash);
}
return Err(BlockImportError::IncompleteHeader(peer))
},
};
trace!(target: "sync", "Header {} has {:?} logs", block.hash, header.digest().logs().len());
let number = header.number().clone();
let hash = header.hash();
let parent_hash = header.parent_hash().clone();
let import_handler = |import| {
match import {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(number))
},
Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())),
Ok(ImportResult::MissingState) => {
debug!(target: "sync", "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash);
Err(BlockImportError::MissingState)
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash);
Err(BlockImportError::UnknownParent)
},
Ok(ImportResult::KnownBad) => {
debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash);
Err(BlockImportError::BadBlock(peer.clone()))
},
Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
Err(BlockImportError::Other(e))
}
}
};
match import_handler(import_handle.check_block(BlockCheckParams {
hash,
number,
parent_hash,
allow_missing_state: block.allow_missing_state,
import_existing: block.import_existing,
}))? {
BlockImportResult::ImportedUnknown { .. } => (),
r => return Ok(r),
}
let started = wasm_timer::Instant::now();
let (mut import_block, maybe_keys) = verifier.verify(block_origin, header, justification, block.body)
.map_err(|msg| {
if let Some(ref peer) = peer {
trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg);
} else {
trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg);
}
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification(false, started.elapsed());
}
BlockImportError::VerificationFailed(peer.clone(), msg)
})?;
if let Some(metrics) = metrics.as_ref() {
metrics.report_verification(true, started.elapsed());
}
let mut cache = HashMap::new();
if let Some(keys) = maybe_keys {
cache.extend(keys.into_iter());
}
import_block.allow_missing_state = block.allow_missing_state;
import_handler(import_handle.import_block(import_block.convert_transaction(), cache))
}