use std::{
collections::HashMap,
sync::Arc,
};
use crate::{base_pool as base, watcher::Watcher};
use futures::Future;
use sp_runtime::{
generic::BlockId,
traits::{self, SaturatedConversion, Block as BlockT},
transaction_validity::{
TransactionValidity, TransactionTag as Tag, TransactionValidityError, TransactionSource,
},
};
use sp_transaction_pool::error;
use wasm_timer::Instant;
use futures::channel::mpsc::Receiver;
use crate::validated_pool::ValidatedPool;
pub use crate::validated_pool::ValidatedTransaction;
pub type EventStream<H> = Receiver<H>;
pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
pub type ValidatedTransactionFor<A> = ValidatedTransaction<
ExtrinsicHash<A>,
ExtrinsicFor<A>,
<A as ChainApi>::Error,
>;
pub trait ChainApi: Send + Sync {
type Block: BlockT;
type Error: From<error::Error> + error::IntoPoolError;
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
type BodyFuture: Future<
Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>
> + Unpin + Send + 'static;
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture;
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<NumberFor<Self>>, Self::Error>;
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
fn block_body(&self, at: &BlockId<Self::Block>) -> Self::BodyFuture;
}
#[derive(Debug, Clone)]
pub struct Options {
pub ready: base::Limit,
pub future: base::Limit,
pub reject_future_transactions: bool,
}
impl Default for Options {
fn default() -> Self {
Options {
ready: base::Limit {
count: 8192,
total_bytes: 20 * 1024 * 1024,
},
future: base::Limit {
count: 512,
total_bytes: 1 * 1024 * 1024,
},
reject_future_transactions: false,
}
}
}
#[derive(Copy, Clone)]
enum CheckBannedBeforeVerify {
Yes,
No,
}
pub struct Pool<B: ChainApi> {
validated_pool: Arc<ValidatedPool<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for Pool<B>
where
ExtrinsicFor<B>: parity_util_mem::MallocSizeOf,
{
fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize {
self.validated_pool.size_of(ops)
}
}
impl<B: ChainApi> Pool<B> {
pub fn new(options: Options, api: Arc<B>) -> Self {
Pool {
validated_pool: Arc::new(ValidatedPool::new(options, api)),
}
}
pub async fn submit_at(
&self,
at: &BlockId<B::Block>,
source: TransactionSource,
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
let xts = xts.into_iter().map(|xt| (source, xt));
let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await?;
Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
}
pub async fn resubmit_at(
&self,
at: &BlockId<B::Block>,
source: TransactionSource,
xts: impl IntoIterator<Item=ExtrinsicFor<B>>,
) -> Result<Vec<Result<ExtrinsicHash<B>, B::Error>>, B::Error> {
let xts = xts.into_iter().map(|xt| (source, xt));
let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await?;
Ok(self.validated_pool.submit(validated_transactions.into_iter().map(|(_, tx)| tx)))
}
pub async fn submit_one(
&self,
at: &BlockId<B::Block>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<ExtrinsicHash<B>, B::Error> {
let res = self.submit_at(at, source, std::iter::once(xt)).await?.pop();
res.expect("One extrinsic passed; one result returned; qed")
}
pub async fn submit_and_watch(
&self,
at: &BlockId<B::Block>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
) -> Result<Watcher<ExtrinsicHash<B>, ExtrinsicHash<B>>, B::Error> {
let block_number = self.resolve_block_number(at)?;
let (_, tx) = self.verify_one(
at,
block_number,
source,
xt,
CheckBannedBeforeVerify::Yes,
).await;
self.validated_pool.submit_and_watch(tx)
}
pub fn resubmit(
&self,
revalidated_transactions: HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
) {
let now = Instant::now();
self.validated_pool.resubmit(revalidated_transactions);
log::debug!(target: "txpool",
"Resubmitted. Took {} ms. Status: {:?}",
now.elapsed().as_millis(),
self.validated_pool.status()
);
}
pub fn prune_known(
&self,
at: &BlockId<B::Block>,
hashes: &[ExtrinsicHash<B>],
) -> Result<(), B::Error> {
let in_pool_tags = self.validated_pool.extrinsics_tags(hashes)
.into_iter().filter_map(|x| x).flat_map(|x| x);
let prune_status = self.validated_pool.prune_tags(in_pool_tags)?;
let pruned_transactions = hashes.into_iter().cloned()
.chain(prune_status.pruned.iter().map(|tx| tx.hash.clone()));
self.validated_pool.fire_pruned(at, pruned_transactions)
}
pub async fn prune(
&self,
at: &BlockId<B::Block>,
parent: &BlockId<B::Block>,
extrinsics: &[ExtrinsicFor<B>],
) -> Result<(), B::Error> {
log::debug!(
target: "txpool",
"Starting pruning of block {:?} (extrinsics: {})",
at,
extrinsics.len()
);
let in_pool_hashes = extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
let all = extrinsics.iter().zip(in_pool_tags.into_iter());
let mut future_tags = Vec::new();
for (extrinsic, in_pool_tags) in all {
match in_pool_tags {
Some(tags) => future_tags.extend(tags),
None => {
let validity = self.validated_pool.api()
.validate_transaction(parent, TransactionSource::InBlock, extrinsic.clone())
.await;
if let Ok(Ok(validity)) = validity {
future_tags.extend(validity.provides);
}
},
}
}
self.prune_tags(at, future_tags, in_pool_hashes).await
}
pub async fn prune_tags(
&self,
at: &BlockId<B::Block>,
tags: impl IntoIterator<Item=Tag>,
known_imported_hashes: impl IntoIterator<Item=ExtrinsicHash<B>> + Clone,
) -> Result<(), B::Error> {
log::debug!(target: "txpool", "Pruning at {:?}", at);
let prune_status = match self.validated_pool.prune_tags(tags) {
Ok(prune_status) => prune_status,
Err(e) => return Err(e),
};
self.validated_pool.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
let pruned_hashes = prune_status.pruned
.iter()
.map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let pruned_transactions = prune_status.pruned
.into_iter()
.map(|tx| (tx.source, tx.data.clone()));
let reverified_transactions = self.verify(
at,
pruned_transactions,
CheckBannedBeforeVerify::Yes,
).await?;
log::trace!(target: "txpool", "Pruning at {:?}. Resubmitting transactions.", at);
self.validated_pool.resubmit_pruned(
&at,
known_imported_hashes,
pruned_hashes,
reverified_transactions.into_iter().map(|(_, xt)| xt).collect(),
)
}
pub fn hash_of(&self, xt: &ExtrinsicFor<B>) -> ExtrinsicHash<B> {
self.validated_pool.api().hash_and_length(xt).0
}
fn resolve_block_number(&self, at: &BlockId<B::Block>) -> Result<NumberFor<B>, B::Error> {
self.validated_pool.api().block_id_to_number(at)
.and_then(|number| number.ok_or_else(||
error::Error::InvalidBlockId(format!("{:?}", at)).into()))
}
async fn verify(
&self,
at: &BlockId<B::Block>,
xts: impl IntoIterator<Item=(TransactionSource, ExtrinsicFor<B>)>,
check: CheckBannedBeforeVerify,
) -> Result<HashMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>, B::Error> {
let block_number = self.resolve_block_number(at)?;
let res = futures::future::join_all(
xts.into_iter()
.map(|(source, xt)| self.verify_one(at, block_number, source, xt, check))
).await.into_iter().collect::<HashMap<_, _>>();
Ok(res)
}
async fn verify_one(
&self,
block_id: &BlockId<B::Block>,
block_number: NumberFor<B>,
source: TransactionSource,
xt: ExtrinsicFor<B>,
check: CheckBannedBeforeVerify,
) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
return (hash.clone(), ValidatedTransaction::Invalid(hash, err.into()))
}
let validation_result = self.validated_pool.api().validate_transaction(
block_id,
source,
xt.clone(),
).await;
let status = match validation_result {
Ok(status) => status,
Err(e) => return (hash.clone(), ValidatedTransaction::Invalid(hash, e)),
};
let validity = match status {
Ok(validity) => {
if validity.provides.is_empty() {
ValidatedTransaction::Invalid(hash.clone(), error::Error::NoTagsProvided.into())
} else {
ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash.clone(),
source,
xt,
bytes,
validity,
)
}
},
Err(TransactionValidityError::Invalid(e)) =>
ValidatedTransaction::Invalid(hash.clone(), error::Error::InvalidTransaction(e).into()),
Err(TransactionValidityError::Unknown(e)) =>
ValidatedTransaction::Unknown(hash.clone(), error::Error::UnknownTransaction(e).into()),
};
(hash, validity)
}
pub fn validated_pool(&self) -> &ValidatedPool<B> {
&self.validated_pool
}
}
impl<B: ChainApi> Clone for Pool<B> {
fn clone(&self) -> Self {
Self {
validated_pool: self.validated_pool.clone(),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use parking_lot::Mutex;
use futures::executor::block_on;
use super::*;
use sp_transaction_pool::TransactionStatus;
use sp_runtime::{
traits::Hash,
transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource},
};
use codec::Encode;
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId, Hashing};
use assert_matches::assert_matches;
use wasm_timer::Instant;
use crate::base_pool::Limit;
const INVALID_NONCE: u64 = 254;
const SOURCE: TransactionSource = TransactionSource::External;
#[derive(Clone, Debug, Default)]
struct TestApi {
delay: Arc<Mutex<Option<std::sync::mpsc::Receiver<()>>>>,
invalidate: Arc<Mutex<HashSet<H256>>>,
clear_requirements: Arc<Mutex<HashSet<H256>>>,
add_requirements: Arc<Mutex<HashSet<H256>>>,
}
impl ChainApi for TestApi {
type Block = Block;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;
fn validate_transaction(
&self,
at: &BlockId<Self::Block>,
_source: TransactionSource,
uxt: ExtrinsicFor<Self>,
) -> Self::ValidationFuture {
let hash = self.hash_and_length(&uxt).0;
let block_number = self.block_id_to_number(at).unwrap().unwrap();
let nonce = uxt.transfer().nonce;
if nonce > 0 {
let opt = self.delay.lock().take();
if let Some(delay) = opt {
if delay.recv().is_err() {
println!("Error waiting for delay!");
}
}
}
if self.invalidate.lock().contains(&hash) {
return futures::future::ready(Ok(InvalidTransaction::Custom(0).into()));
}
futures::future::ready(if nonce < block_number {
Ok(InvalidTransaction::Stale.into())
} else {
let mut transaction = ValidTransaction {
priority: 4,
requires: if nonce > block_number { vec![vec![nonce as u8 - 1]] } else { vec![] },
provides: if nonce == INVALID_NONCE { vec![] } else { vec![vec![nonce as u8]] },
longevity: 3,
propagate: true,
};
if self.clear_requirements.lock().contains(&hash) {
transaction.requires.clear();
}
if self.add_requirements.lock().contains(&hash) {
transaction.requires.push(vec![128]);
}
Ok(Ok(transaction))
})
}
fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<NumberFor<Self>>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(*num),
BlockId::Hash(_) => None,
})
}
fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error> {
Ok(match at {
BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(),
BlockId::Hash(_) => None,
})
}
fn hash_and_length(&self, uxt: &ExtrinsicFor<Self>) -> (BlockHash<Self>, usize) {
let encoded = uxt.encode();
let len = encoded.len();
(Hashing::hash(&encoded), len)
}
fn block_body(&self, _id: &BlockId<Self::Block>) -> Self::BodyFuture {
futures::future::ready(Ok(None))
}
}
fn uxt(transfer: Transfer) -> Extrinsic {
Extrinsic::Transfer {
transfer,
signature: Default::default(),
exhaust_resources_when_not_first: false,
}
}
fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::default().into())
}
#[test]
fn should_validate_and_import_transaction() {
let pool = pool();
let hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
}
#[test]
fn should_reject_if_temporarily_banned() {
let pool = pool();
let uxt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
pool.validated_pool.rotator().ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
let res = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt));
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
}
#[test]
fn should_notify_about_pool_events() {
let (stream, hash0, hash1) = {
let pool = pool();
let stream = pool.validated_pool().import_notification_stream();
let hash0 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
}))).unwrap();
let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 3,
}))).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
assert_eq!(pool.validated_pool().status().future, 1);
(stream, hash0, hash1)
};
let mut it = futures::executor::block_on_stream(stream);
assert_eq!(it.next(), Some(hash0));
assert_eq!(it.next(), Some(hash1));
assert_eq!(it.next(), None);
}
#[test]
fn should_clear_stale_transactions() {
let pool = pool();
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
let hash2 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
}))).unwrap();
let hash3 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 3,
}))).unwrap();
pool.validated_pool.clear_stale(&BlockId::Number(5)).unwrap();
assert_eq!(pool.validated_pool().ready().count(), 0);
assert_eq!(pool.validated_pool().status().future, 0);
assert_eq!(pool.validated_pool().status().ready, 0);
assert!(pool.validated_pool.rotator().is_banned(&hash1));
assert!(pool.validated_pool.rotator().is_banned(&hash2));
assert!(pool.validated_pool.rotator().is_banned(&hash3));
}
#[test]
fn should_ban_mined_transactions() {
let pool = pool();
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
block_on(pool.prune_tags(&BlockId::Number(1), vec![vec![0]], vec![hash1.clone()])).unwrap();
assert!(pool.validated_pool.rotator().is_banned(&hash1));
}
#[test]
fn should_limit_futures() {
let limit = Limit {
count: 100,
total_bytes: 200,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default().into());
let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
}))).unwrap();
assert_eq!(pool.validated_pool().status().future, 1);
let hash2 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(2)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 10,
}))).unwrap();
assert_eq!(pool.validated_pool().status().future, 1);
assert!(pool.validated_pool.rotator().is_banned(&hash1));
assert!(!pool.validated_pool.rotator().is_banned(&hash2));
}
#[test]
fn should_error_if_reject_immediately() {
let limit = Limit {
count: 100,
total_bytes: 10,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default().into());
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
}))).unwrap_err();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
}
#[test]
fn should_reject_transactions_with_no_provides() {
let pool = pool();
let err = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: INVALID_NONCE,
}))).unwrap_err();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
assert_matches!(err, error::Error::NoTagsProvided);
}
mod listener {
use super::*;
#[test]
fn should_trigger_ready_and_finalized() {
let pool = pool();
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![])).unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
);
}
#[test]
fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
let pool = pool();
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
block_on(
pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![watcher.hash().clone()]),
).unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 0);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(
stream.next(),
Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())),
);
}
#[test]
fn should_trigger_future_and_ready_after_promoted() {
let pool = pool();
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
}))).unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
assert_eq!(pool.validated_pool().status().future, 1);
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
}))).unwrap();
assert_eq!(pool.validated_pool().status().ready, 2);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Future));
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
}
#[test]
fn should_trigger_invalid_and_ban() {
let pool = pool();
let uxt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
pool.validated_pool.remove_invalid(&[*watcher.hash()]);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
assert_eq!(stream.next(), None);
}
#[test]
fn should_trigger_broadcasted() {
let pool = pool();
let uxt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, uxt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let mut map = HashMap::new();
let peers = vec!["a".into(), "b".into(), "c".into()];
map.insert(*watcher.hash(), peers.clone());
pool.validated_pool().on_broadcasted(map);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
}
#[test]
fn should_trigger_dropped() {
let limit = Limit {
count: 1,
total_bytes: 1000,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
..Default::default()
}, TestApi::default().into());
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let watcher = block_on(pool.submit_and_watch(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(2)),
to: AccountId::from_h256(H256::from_low_u64_be(1)),
amount: 4,
nonce: 1,
});
block_on(pool.submit_one(&BlockId::Number(1), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
}
#[test]
fn should_handle_pruning_in_the_middle_of_import() {
let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let pool = Arc::new(Pool::new(Default::default(), api.into()));
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
});
let pool2 = pool.clone();
std::thread::spawn(move || {
block_on(pool2.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
ready.send(()).unwrap();
});
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 4,
nonce: 0,
});
let provides = vec![0_u8];
block_on(pool.submit_one(&BlockId::Number(0), SOURCE, xt)).unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
block_on(pool.prune_tags(&BlockId::Number(1), vec![provides], vec![])).unwrap();
assert_eq!(pool.validated_pool().status().ready, 0);
tx.send(()).unwrap();
is_ready.recv().unwrap();
assert_eq!(pool.validated_pool().status().ready, 1);
assert_eq!(pool.validated_pool().status().future, 0);
}
}
}