use std::{
collections::{HashSet, HashMap},
hash,
sync::Arc,
};
use crate::base_pool as base;
use crate::listener::Listener;
use crate::rotator::PoolRotator;
use crate::watcher::Watcher;
use serde::Serialize;
use parking_lot::{Mutex, RwLock};
use sp_runtime::{
generic::BlockId,
traits::{self, SaturatedConversion},
transaction_validity::{TransactionTag as Tag, ValidTransaction, TransactionSource},
};
use sp_transaction_pool::{error, PoolStatus};
use wasm_timer::Instant;
use futures::channel::mpsc::{channel, Sender};
use retain_mut::RetainMut;
use crate::base_pool::PruneStatus;
use crate::pool::{
EventStream, Options, ChainApi, BlockHash, ExtrinsicHash, ExtrinsicFor, TransactionFor,
};
#[derive(Debug)]
pub enum ValidatedTransaction<Hash, Ex, Error> {
Valid(base::Transaction<Hash, Ex>),
Invalid(Hash, Error),
Unknown(Hash, Error),
}
impl<Hash, Ex, Error> ValidatedTransaction<Hash, Ex, Error> {
pub fn valid_at(
at: u64,
hash: Hash,
source: TransactionSource,
data: Ex,
bytes: usize,
validity: ValidTransaction,
) -> Self {
Self::Valid(base::Transaction {
data,
bytes,
hash,
source,
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: at
.saturated_into::<u64>()
.saturating_add(validity.longevity),
})
}
}
pub type ValidatedTransactionFor<B> = ValidatedTransaction<
ExtrinsicHash<B>,
ExtrinsicFor<B>,
<B as ChainApi>::Error,
>;
pub struct ValidatedPool<B: ChainApi> {
api: Arc<B>,
options: Options,
listener: RwLock<Listener<ExtrinsicHash<B>, B>>,
pool: RwLock<base::BasePool<
ExtrinsicHash<B>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<Sender<ExtrinsicHash<B>>>>,
rotator: PoolRotator<ExtrinsicHash<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for ValidatedPool<B>
where
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
self.pool.size_of(ops)
}
}
impl<B: ChainApi> ValidatedPool<B> {
pub fn new(options: Options, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
ValidatedPool {
options,
listener: Default::default(),
api,
pool: RwLock::new(base_pool),
import_notification_sinks: Default::default(),
rotator: Default::default(),
}
}
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=ExtrinsicHash<B>>) {
self.rotator.ban(now, hashes)
}
pub fn is_banned(&self, hash: &ExtrinsicHash<B>) -> bool {
self.rotator.is_banned(hash)
}
pub fn check_is_known(
&self,
tx_hash: &ExtrinsicHash<B>,
ignore_banned: bool,
) -> Result<(), B::Error> {
if !ignore_banned && self.is_banned(tx_hash) {
Err(error::Error::TemporarilyBanned.into())
} else if self.pool.read().is_imported(tx_hash) {
Err(error::Error::AlreadyImported(Box::new(tx_hash.clone())).into())
} else {
Ok(())
}
}
pub fn submit(
&self,
txs: impl IntoIterator<Item=ValidatedTransactionFor<B>>,
) -> Vec<Result<ExtrinsicHash<B>, B::Error>> {
let results = txs.into_iter()
.map(|validated_tx| self.submit_one(validated_tx))
.collect::<Vec<_>>();
let removed = if results.iter().any(|res| res.is_ok()) {
self.enforce_limits()
} else {
Default::default()
};
results.into_iter().map(|res| match res {
Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()),
other => other,
}).collect()
}
fn submit_one(&self, tx: ValidatedTransactionFor<B>) -> Result<ExtrinsicHash<B>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
let imported = self.pool.write().import(tx)?;
if let base::Imported::Ready { ref hash, .. } = imported {
self.import_notification_sinks.lock()
.retain_mut(|sink| {
match sink.try_send(hash.clone()) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
log::warn!(target: "txpool", "[{:?}] Trying to notify an import but the channel is full", hash);
true
} else {
false
}
},
}
});
}
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
Ok(imported.hash().clone())
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err.into())
},
ValidatedTransaction::Unknown(hash, err) => {
self.listener.write().invalid(&hash, false);
Err(err.into())
},
}
}
fn enforce_limits(&self) -> HashSet<ExtrinsicHash<B>> {
let status = self.pool.read().status();
let ready_limit = &self.options.ready;
let future_limit = &self.options.future;
log::debug!(target: "txpool", "Pool Status: {:?}", status);
if ready_limit.is_exceeded(status.ready, status.ready_bytes)
|| future_limit.is_exceeded(status.future, status.future_bytes)
{
log::debug!(
target: "txpool",
"Enforcing limits ({}/{}kB ready, {}/{}kB future",
ready_limit.count, ready_limit.total_bytes / 1024,
future_limit.count, future_limit.total_bytes / 1024,
);
let removed = {
let mut pool = self.pool.write();
let removed = pool.enforce_limits(ready_limit, future_limit)
.into_iter().map(|x| x.hash.clone()).collect::<HashSet<_>>();
self.rotator.ban(&Instant::now(), removed.iter().map(|x| x.clone()));
removed
};
if !removed.is_empty() {
log::debug!(target: "txpool", "Enforcing limits: {} dropped", removed.len());
}
let mut listener = self.listener.write();
for h in &removed {
listener.dropped(h, None);
}
removed
} else {
Default::default()
}
}
pub fn submit_and_watch(
&self,
tx: ValidatedTransactionFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
match tx {
ValidatedTransaction::Valid(tx) => {
let hash = self.api.hash_and_length(&tx.data).0;
let watcher = self.listener.write().create_watcher(hash);
self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
.pop()
.expect("One extrinsic passed; one result returned; qed")
.map(|_| watcher)
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err.into())
},
ValidatedTransaction::Unknown(_, err) => Err(err.into()),
}
}
pub fn resubmit(&self, mut updated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>) {
#[derive(Debug, Clone, Copy, PartialEq)]
enum Status { Future, Ready, Failed, Dropped };
let (mut initial_statuses, final_statuses) = {
let mut pool = self.pool.write();
let mut initial_statuses = HashMap::new();
let mut txs_to_resubmit = Vec::with_capacity(updated_transactions.len());
while !updated_transactions.is_empty() {
let hash = updated_transactions.keys().next().cloned().expect("transactions is not empty; qed");
let removed = pool.remove_subtree(&[hash.clone()]);
for removed_tx in removed {
let removed_hash = removed_tx.hash.clone();
let updated_transaction = updated_transactions.remove(&removed_hash);
let tx_to_resubmit = if let Some(updated_tx) = updated_transaction {
updated_tx
} else {
let transaction = match Arc::try_unwrap(removed_tx) {
Ok(transaction) => transaction,
Err(transaction) => transaction.duplicate(),
};
ValidatedTransaction::Valid(transaction)
};
initial_statuses.insert(removed_hash.clone(), Status::Ready);
txs_to_resubmit.push((removed_hash, tx_to_resubmit));
}
updated_transactions.remove(&hash);
}
pool.with_futures_enabled(|pool, reject_future_transactions| {
let mut final_statuses = HashMap::new();
for (hash, tx_to_resubmit) in txs_to_resubmit {
match tx_to_resubmit {
ValidatedTransaction::Valid(tx) => match pool.import(tx) {
Ok(imported) => match imported {
base::Imported::Ready { promoted, failed, removed, .. } => {
final_statuses.insert(hash, Status::Ready);
for hash in promoted {
final_statuses.insert(hash, Status::Ready);
}
for hash in failed {
final_statuses.insert(hash, Status::Failed);
}
for tx in removed {
final_statuses.insert(tx.hash.clone(), Status::Dropped);
}
},
base::Imported::Future { .. } => {
final_statuses.insert(hash, Status::Future);
},
},
Err(err) => {
log::warn!(
target: "txpool",
"[{:?}] Removing invalid transaction from update: {}",
hash,
err,
);
final_statuses.insert(hash, Status::Failed);
},
},
ValidatedTransaction::Invalid(_, _) | ValidatedTransaction::Unknown(_, _) => {
final_statuses.insert(hash, Status::Failed);
},
}
}
if reject_future_transactions {
for future_tx in pool.clear_future() {
final_statuses.insert(future_tx.hash.clone(), Status::Dropped);
}
}
(initial_statuses, final_statuses)
})
};
let mut listener = self.listener.write();
for (hash, final_status) in final_statuses {
let initial_status = initial_statuses.remove(&hash);
if initial_status.is_none() || Some(final_status) != initial_status {
match final_status {
Status::Future => listener.future(&hash),
Status::Ready => listener.ready(&hash, None),
Status::Dropped => listener.dropped(&hash, None),
Status::Failed => listener.invalid(&hash, initial_status.is_some()),
}
}
}
}
pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<Option<Vec<Tag>>> {
self.pool.read().by_hashes(&hashes)
.into_iter()
.map(|existing_in_pool| existing_in_pool
.map(|transaction| transaction.provides.iter().cloned().collect()))
.collect()
}
pub fn ready_by_hash(&self, hash: &ExtrinsicHash<B>) -> Option<TransactionFor<B>> {
self.pool.read().ready_by_hash(hash)
}
pub fn prune_tags(
&self,
tags: impl IntoIterator<Item=Tag>,
) -> Result<PruneStatus<ExtrinsicHash<B>, ExtrinsicFor<B>>, B::Error> {
let status = self.pool.write().prune_tags(tags);
{
let mut listener = self.listener.write();
for promoted in &status.promoted {
fire_events(&mut *listener, promoted);
}
for f in &status.failed {
listener.dropped(f, None);
}
}
Ok(status)
}
pub fn resubmit_pruned(
&self,
at: &BlockId<B::Block>,
known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
pruned_hashes: Vec<ExtrinsicHash<B>>,
pruned_xts: Vec<ValidatedTransactionFor<B>>,
) -> Result<(), B::Error> {
debug_assert_eq!(pruned_hashes.len(), pruned_xts.len());
let results = self.submit(pruned_xts);
let hashes = results
.into_iter()
.enumerate()
.filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Err(Ok(error::Error::InvalidTransaction(_))) => Some(pruned_hashes[idx].clone()),
_ => None,
});
let hashes = hashes.chain(known_imported_hashes.into_iter());
self.fire_pruned(at, hashes)?;
self.clear_stale(at)?;
Ok(())
}
pub fn fire_pruned(
&self,
at: &BlockId<B::Block>,
hashes: impl Iterator<Item=ExtrinsicHash<B>>,
) -> Result<(), B::Error> {
let header_hash = self.api.block_id_to_hash(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?;
let mut listener = self.listener.write();
let mut set = HashSet::with_capacity(hashes.size_hint().0);
for h in hashes {
if !set.contains(&h) {
listener.pruned(header_hash, &h);
set.insert(h);
}
}
Ok(())
}
pub fn clear_stale(&self, at: &BlockId<B::Block>) -> Result<(), B::Error> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?
.saturated_into::<u64>();
let now = Instant::now();
let to_remove = {
self.ready()
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
.map(|tx| tx.hash.clone())
.collect::<Vec<_>>()
};
let futures_to_remove: Vec<ExtrinsicHash<B>> = {
let p = self.pool.read();
let mut hashes = Vec::new();
for tx in p.futures() {
if self.rotator.ban_if_stale(&now, block_number, &tx) {
hashes.push(tx.hash.clone());
}
}
hashes
};
self.remove_invalid(&to_remove);
self.remove_invalid(&futures_to_remove);
self.rotator.clear_timeouts(&now);
Ok(())
}
#[cfg(test)]
pub fn rotator(&self) -> &PoolRotator<ExtrinsicHash<B>> {
&self.rotator
}
pub fn api(&self) -> &B {
&self.api
}
pub fn import_notification_stream(&self) -> EventStream<ExtrinsicHash<B>> {
const CHANNEL_BUFFER_SIZE: usize = 1024;
let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.import_notification_sinks.lock().push(sink);
stream
}
pub fn on_broadcasted(&self, propagated: HashMap<ExtrinsicHash<B>, Vec<String>>) {
let mut listener = self.listener.write();
for (hash, peers) in propagated.into_iter() {
listener.broadcasted(&hash, peers);
}
}
pub fn remove_invalid(&self, hashes: &[ExtrinsicHash<B>]) -> Vec<TransactionFor<B>> {
if hashes.is_empty() {
return vec![];
}
log::debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes);
self.rotator.ban(&Instant::now(), hashes.iter().cloned());
let invalid = self.pool.write().remove_subtree(hashes);
log::debug!(target: "txpool", "Removed invalid transactions: {:?}", invalid);
let mut listener = self.listener.write();
for tx in &invalid {
listener.invalid(&tx.hash, true);
}
invalid
}
pub fn ready(&self) -> impl Iterator<Item=TransactionFor<B>> + Send {
self.pool.read().ready()
}
pub fn status(&self) -> PoolStatus {
self.pool.read().status()
}
pub async fn on_block_finalized(&self, block_hash: BlockHash<B>) -> Result<(), B::Error> {
log::trace!(target: "txpool", "Attempting to notify watchers of finalization for {}", block_hash);
self.listener.write().finalized(block_hash);
Ok(())
}
pub fn on_block_retracted(&self, block_hash: BlockHash<B>) {
self.listener.write().retracted(block_hash)
}
}
fn fire_events<H, B, Ex>(
listener: &mut Listener<H, B>,
imported: &base::Imported<H, Ex>,
) where
H: hash::Hash + Eq + traits::Member + Serialize,
B: ChainApi,
{
match *imported {
base::Imported::Ready { ref promoted, ref failed, ref removed, ref hash } => {
listener.ready(hash, None);
for f in failed {
listener.invalid(f, true);
}
for r in removed {
listener.dropped(&r.hash, Some(hash));
}
for p in promoted {
listener.ready(p, None);
}
},
base::Imported::Future { ref hash } => {
listener.future(hash)
},
}
}