use futures::{prelude::*, ready};
use futures::channel::mpsc::{self, UnboundedReceiver};
#[cfg(feature = "std")]
use log::trace;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::hash::Hash;
use crate::round::State as RoundState;
use crate::{
CatchUp, Chain, Commit, CompactCommit, Equivocation, Message, Prevote, Precommit,
PrimaryPropose, SignedMessage, BlockNumberOps, validate_commit, CommitValidationResult,
HistoricalVotes,
};
use crate::voter_set::VoterSet;
use crate::weights::VoteWeight;
use past_rounds::PastRounds;
use voting_round::{VotingRound, State as VotingRoundState};
mod past_rounds;
mod voting_round;
pub trait Environment<H: Eq, N: BlockNumberOps>: Chain<H, N> {
type Timer: Future<Output=Result<(),Self::Error>> + Unpin;
type Id: Ord + Clone + Eq + ::std::fmt::Debug;
type Signature: Eq + Clone;
type In: Stream<Item=Result<SignedMessage<H, N, Self::Signature, Self::Id>, Self::Error>> + Unpin;
type Out: Sink<Message<H, N>, Error=Self::Error> + Unpin;
type Error: From<crate::Error> + ::std::error::Error;
fn round_data(&self, round: u64) -> RoundData<
Self::Id,
Self::Timer,
Self::In,
Self::Out,
>;
fn round_commit_timer(&self) -> Self::Timer;
fn proposed(&self, round: u64, propose: PrimaryPropose<H, N>) -> Result<(), Self::Error>;
fn prevoted(&self, round: u64, prevote: Prevote<H, N>) -> Result<(), Self::Error>;
fn precommitted(&self, round: u64, precommit: Precommit<H, N>) -> Result<(), Self::Error>;
fn completed(
&self,
round: u64,
state: RoundState<H, N>,
base: (H, N),
votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
) -> Result<(), Self::Error>;
fn concluded(
&self,
round: u64,
state: RoundState<H, N>,
base: (H, N),
votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
) -> Result<(), Self::Error>;
fn finalize_block(&self, hash: H, number: N, round: u64, commit: Commit<H, N, Self::Signature, Self::Id>) -> Result<(), Self::Error>;
fn prevote_equivocation(&self, round: u64, equivocation: Equivocation<Self::Id, Prevote<H, N>, Self::Signature>);
fn precommit_equivocation(&self, round: u64, equivocation: Equivocation<Self::Id, Precommit<H, N>, Self::Signature>);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommunicationOut<H, N, S, Id> {
Commit(u64, Commit<H, N, S, Id>),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommitProcessingOutcome {
Good(GoodCommit),
Bad(BadCommit),
}
#[cfg(any(test, feature = "test-helpers"))]
impl CommitProcessingOutcome {
pub fn good() -> CommitProcessingOutcome {
CommitProcessingOutcome::Good(GoodCommit::new())
}
pub fn bad() -> CommitProcessingOutcome {
CommitProcessingOutcome::Bad(CommitValidationResult::<(), ()>::default().into())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GoodCommit {
_priv: (),
}
impl GoodCommit {
pub(crate) fn new() -> Self {
GoodCommit { _priv: () }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadCommit {
_priv: (),
num_precommits: usize,
num_duplicated_precommits: usize,
num_equivocations: usize,
num_invalid_voters: usize,
}
impl BadCommit {
pub fn num_precommits(&self) -> usize {
self.num_precommits
}
pub fn num_duplicated(&self) -> usize {
self.num_duplicated_precommits
}
pub fn num_equivocations(&self) -> usize {
self.num_equivocations
}
pub fn num_invalid_voters(&self) -> usize {
self.num_invalid_voters
}
}
impl<H, N> From<CommitValidationResult<H, N>> for BadCommit {
fn from(r: CommitValidationResult<H, N>) -> Self {
BadCommit {
num_precommits: r.num_precommits,
num_duplicated_precommits: r.num_duplicated_precommits,
num_equivocations: r.num_equivocations,
num_invalid_voters: r.num_invalid_voters,
_priv: (),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CatchUpProcessingOutcome {
Good(GoodCatchUp),
Bad(BadCatchUp),
Useless,
}
#[cfg(any(test, feature = "test-helpers"))]
impl CatchUpProcessingOutcome {
pub fn bad() -> CatchUpProcessingOutcome {
CatchUpProcessingOutcome::Bad(BadCatchUp::new())
}
pub fn good() -> CatchUpProcessingOutcome {
CatchUpProcessingOutcome::Good(GoodCatchUp::new())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GoodCatchUp {
_priv: (),
}
impl GoodCatchUp {
pub(crate) fn new() -> Self {
GoodCatchUp { _priv: () }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BadCatchUp {
_priv: (),
}
impl BadCatchUp {
pub(crate) fn new() -> Self {
BadCatchUp { _priv: () }
}
}
pub enum Callback<O> {
Blank,
Work(Box<dyn FnMut(O) + Send>),
}
#[cfg(any(test, feature = "test-helpers"))]
impl<O> Clone for Callback<O> {
fn clone(&self) -> Self {
Callback::Blank
}
}
impl<O> Callback<O> {
pub fn run(&mut self, o: O) {
match self {
Callback::Blank => {},
Callback::Work(cb) => cb(o),
}
}
}
#[cfg_attr(any(test, feature = "test-helpers"), derive(Clone))]
pub enum CommunicationIn<H, N, S, Id> {
Commit(u64, CompactCommit<H, N, S, Id>, Callback<CommitProcessingOutcome>),
CatchUp(CatchUp<H, N, S, Id>, Callback<CatchUpProcessingOutcome>),
}
impl<H, N, S, Id> Unpin for CommunicationIn<H, N, S, Id> {}
pub struct RoundData<Id, Timer, Input, Output> {
pub voter_id: Option<Id>,
pub prevote_timer: Timer,
pub precommit_timer: Timer,
pub incoming: Input,
pub outgoing: Output,
}
struct Buffered<S, I> {
inner: S,
buffer: VecDeque<I>,
}
impl<S: Sink<I> + Unpin, I> Buffered<S, I> {
fn new(inner: S) -> Buffered<S, I> {
Buffered {
buffer: VecDeque::new(),
inner
}
}
fn push(&mut self, item: I) {
self.buffer.push_back(item);
}
fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), S::Error>> {
let polled = self.schedule_all(cx)?;
match polled {
Poll::Ready(()) => Sink::poll_flush(Pin::new(&mut self.inner), cx),
Poll::Pending => {
ready!(Sink::poll_flush(Pin::new(&mut self.inner), cx))?;
Poll::Pending
}
}
}
fn schedule_all(&mut self, cx: &mut Context) -> Poll<Result<(), S::Error>> {
while !self.buffer.is_empty() {
ready!(Sink::poll_ready(Pin::new(&mut self.inner), cx))?;
let item = self.buffer.pop_front()
.expect("we checked self.buffer.is_empty() just above; qed");
Sink::start_send(Pin::new(&mut self.inner), item)?;
}
Poll::Ready(Ok(()))
}
}
type FinalizedNotification<H, N, E> = (
H,
N,
u64,
Commit<H, N, <E as Environment<H, N>>::Signature, <E as Environment<H, N>>::Id>,
);
fn instantiate_last_round<H, N, E: Environment<H, N>>(
voters: VoterSet<E::Id>,
last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
last_round_number: u64,
last_round_base: (H, N),
finalized_sender: mpsc::UnboundedSender<FinalizedNotification<H, N, E>>,
env: Arc<E>,
) -> Option<VotingRound<H, N, E>> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
{
let last_round_tracker = crate::round::Round::new(crate::round::RoundParams {
voters,
base: last_round_base,
round_number: last_round_number,
});
let mut last_round = VotingRound::completed(
last_round_tracker,
finalized_sender,
None,
env,
);
for vote in last_round_votes {
last_round.handle_vote(vote).ok()?;
}
if last_round.round_state().completable {
Some(last_round)
} else {
None
}
}
struct InnerVoterState<H, N, E> where
H: Clone + Ord + std::fmt::Debug,
N: BlockNumberOps,
E: Environment<H, N>,
{
best_round: VotingRound<H, N, E>,
past_rounds: PastRounds<H, N, E>,
}
pub struct Voter<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error=E::Error> + Unpin,
{
env: Arc<E>,
voters: VoterSet<E::Id>,
inner: Arc<RwLock<InnerVoterState<H, N, E>>>,
finalized_notifications: UnboundedReceiver<FinalizedNotification<H, N, E>>,
last_finalized_number: N,
global_in: GlobalIn,
global_out: Buffered<GlobalOut, CommunicationOut<H, N, E::Signature, E::Id>>,
last_finalized_in_rounds: (H, N),
}
impl<'a, H: 'a, N, E: 'a, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut> where
H: Clone + Ord + ::std::fmt::Debug + Sync + Send,
N: BlockNumberOps + Sync + Send,
E: Environment<H, N> + Sync + Send,
GlobalIn: Stream<Item=Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error=E::Error> + Unpin,
{
pub fn voter_state(&self) -> Box<dyn VoterState<E::Id> + 'a + Send + Sync>
where
<E as Environment<H, N>>::Signature: Send + Sync,
<E as Environment<H, N>>::Id: Hash + Send + Sync,
<E as Environment<H, N>>::Timer: Send + Sync,
<E as Environment<H, N>>::Out: Send + Sync,
<E as Environment<H, N>>::In: Send + Sync,
{
Box::new(SharedVoterState(self.inner.clone()))
}
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error=E::Error> + Unpin,
{
pub fn new(
env: Arc<E>,
voters: VoterSet<E::Id>,
global_comms: (GlobalIn, GlobalOut),
last_round_number: u64,
last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
last_round_base: (H, N),
last_finalized: (H, N),
) -> Self {
let (finalized_sender, finalized_notifications) = mpsc::unbounded();
let last_finalized_number = last_finalized.1;
let mut past_rounds = PastRounds::new();
let mut last_round_state = crate::bridge_state::bridge_state(RoundState::genesis(last_round_base.clone())).1;
if last_round_number > 0 {
let maybe_completed_last_round = instantiate_last_round(
voters.clone(),
last_round_votes,
last_round_number,
last_round_base,
finalized_sender.clone(),
env.clone(),
);
if let Some(mut last_round) = maybe_completed_last_round {
last_round_state = last_round.bridge_state();
past_rounds.push(&*env, last_round);
}
}
let best_round = VotingRound::new(
last_round_number + 1,
voters.clone(),
last_finalized.clone(),
Some(last_round_state),
finalized_sender,
env.clone(),
);
let (global_in, global_out) = global_comms;
let inner = Arc::new(RwLock::new(InnerVoterState {
best_round,
past_rounds,
}));
Voter {
env,
voters,
inner,
finalized_notifications,
last_finalized_number,
last_finalized_in_rounds: last_finalized,
global_in,
global_out: Buffered::new(global_out),
}
}
fn prune_background_rounds(&mut self, cx: &mut Context) -> Result<(), E::Error> {
{
let mut inner = self.inner.write();
while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut inner.past_rounds), cx) {
let (number, commit) = item?;
self.global_out.push(CommunicationOut::Commit(number, commit));
}
}
while let Poll::Ready(res) = Stream::poll_next(Pin::new(&mut self.finalized_notifications), cx) {
let inner = self.inner.clone();
let mut inner = inner.write();
let (f_hash, f_num, round, commit) =
res.expect("one sender always kept alive in self.best_round; qed");
inner.past_rounds.update_finalized(f_num);
if self.set_last_finalized_number(f_num) {
self.env.finalize_block(f_hash.clone(), f_num, round, commit)?;
}
if f_num > self.last_finalized_in_rounds.1 {
self.last_finalized_in_rounds = (f_hash, f_num);
}
}
Ok(())
}
fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> {
while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.global_in), cx) {
match item? {
CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => {
trace!(target: "afg", "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}",
round_number,
commit.target_number,
commit.target_hash,
);
let commit: Commit<_, _, _, _> = commit.into();
let mut inner = self.inner.write();
if let Some(commit) = inner.past_rounds.import_commit(round_number, commit) {
let validation_result = validate_commit(&commit, &self.voters, &*self.env)?;
if let Some((finalized_hash, finalized_number)) = validation_result.ghost {
let last_finalized_number = &mut self.last_finalized_number;
inner.past_rounds.update_finalized(finalized_number);
if finalized_number > *last_finalized_number {
*last_finalized_number = finalized_number;
self.env.finalize_block(finalized_hash, finalized_number, round_number, commit)?;
}
process_commit_outcome.run(CommitProcessingOutcome::Good(GoodCommit::new()));
} else {
process_commit_outcome.run(
CommitProcessingOutcome::Bad(BadCommit::from(validation_result)),
);
}
} else {
process_commit_outcome.run(CommitProcessingOutcome::Good(GoodCommit::new()));
}
}
CommunicationIn::CatchUp(catch_up, mut process_catch_up_outcome) => {
trace!(target: "afg", "Got catch-up message for round {}", catch_up.round_number);
let mut inner = self.inner.write();
let round = if let Some(round) = validate_catch_up(
catch_up,
&*self.env,
&self.voters,
inner.best_round.round_number(),
) {
round
} else {
process_catch_up_outcome.run(CatchUpProcessingOutcome::Bad(BadCatchUp::new()));
return Ok(());
};
let state = round.state();
let mut just_completed = VotingRound::completed(
round,
inner.best_round.finalized_sender(),
None,
self.env.clone(),
);
let new_best = VotingRound::new(
just_completed.round_number() + 1,
self.voters.clone(),
self.last_finalized_in_rounds.clone(),
Some(just_completed.bridge_state()),
inner.best_round.finalized_sender(),
self.env.clone(),
);
if let Some((f_hash, f_num)) = state.finalized.clone() {
if f_num > self.last_finalized_in_rounds.1 {
self.last_finalized_in_rounds = (f_hash, f_num);
}
}
self.env.completed(
just_completed.round_number(),
just_completed.round_state(),
just_completed.dag_base(),
just_completed.historical_votes(),
)?;
inner.past_rounds.push(&*self.env, just_completed);
let old_best = std::mem::replace(&mut inner.best_round, new_best);
inner.past_rounds.push(
&*self.env,
old_best,
);
process_catch_up_outcome.run(CatchUpProcessingOutcome::Good(GoodCatchUp::new()));
},
}
}
Ok(())
}
fn process_best_round(&mut self, cx: &mut Context) -> Poll<Result<(), E::Error>> {
{
let mut inner = self.inner.write();
let should_start_next = {
let completable = match inner.best_round.poll(cx)? {
Poll::Ready(()) => true,
Poll::Pending => false,
};
let precommitted = match inner.best_round.state() {
Some(&VotingRoundState::Precommitted) => true,
_ => false,
};
completable && precommitted
};
if !should_start_next { return Poll::Pending }
trace!(target: "afg", "Best round at {} has become completable. Starting new best round at {}",
inner.best_round.round_number(),
inner.best_round.round_number() + 1,
);
}
self.completed_best_round()?;
self.poll_unpin(cx)
}
fn completed_best_round(&mut self) -> Result<(), E::Error> {
let mut inner = self.inner.write();
self.env.completed(
inner.best_round.round_number(),
inner.best_round.round_state(),
inner.best_round.dag_base(),
inner.best_round.historical_votes(),
)?;
let old_round_number = inner.best_round.round_number();
let next_round = VotingRound::new(
old_round_number + 1,
self.voters.clone(),
self.last_finalized_in_rounds.clone(),
Some(inner.best_round.bridge_state()),
inner.best_round.finalized_sender(),
self.env.clone(),
);
let old_round = ::std::mem::replace(&mut inner.best_round, next_round);
inner.past_rounds.push(&*self.env, old_round);
Ok(())
}
fn set_last_finalized_number(&mut self, finalized_number: N) -> bool {
let last_finalized_number = &mut self.last_finalized_number;
if finalized_number > *last_finalized_number {
*last_finalized_number = finalized_number;
return true;
}
false
}
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Future for Voter<H, N, E, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error=E::Error> + Unpin,
{
type Output = Result<(), E::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E::Error>> {
self.process_incoming(cx)?;
self.prune_background_rounds(cx)?;
let _ = self.global_out.poll(cx)?;
self.process_best_round(cx)
}
}
impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Unpin for Voter<H, N, E, GlobalIn, GlobalOut> where
H: Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
GlobalIn: Stream<Item=Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error=E::Error> + Unpin,
{
}
pub trait VoterState<Id: Eq + std::hash::Hash> {
fn get(&self) -> report::VoterState<Id>;
}
pub mod report {
use std::collections::{HashMap, HashSet};
use crate::weights::{VoteWeight, VoterWeight};
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(test, derive(Debug))]
pub struct RoundState<Id: Eq + std::hash::Hash> {
pub total_weight: VoterWeight,
pub threshold_weight: VoterWeight,
pub prevote_current_weight: VoteWeight,
pub prevote_ids: HashSet<Id>,
pub precommit_current_weight: VoteWeight,
pub precommit_ids: HashSet<Id>,
}
#[derive(PartialEq, Eq)]
#[cfg_attr(test, derive(Debug))]
pub struct VoterState<Id: Eq + std::hash::Hash> {
pub background_rounds: HashMap<u64, RoundState<Id>>,
pub best_round: (u64, RoundState<Id>),
}
}
struct SharedVoterState<H, N, E>(Arc<RwLock<InnerVoterState<H, N, E>>>) where
H: Clone + Ord + std::fmt::Debug,
N: BlockNumberOps,
E: Environment<H, N>;
impl<H, N, E> VoterState<E::Id> for SharedVoterState<H, N, E> where
H: Clone + Eq + Ord + std::fmt::Debug,
N: BlockNumberOps,
E: Environment<H, N>,
<E as Environment<H, N>>::Id: Hash,
{
fn get(&self) -> report::VoterState<E::Id> {
let to_round_state = |voting_round: &VotingRound<H, N, E>| {
(
voting_round.round_number(),
report::RoundState {
total_weight: voting_round.voters().total_weight(),
threshold_weight: voting_round.voters().threshold(),
prevote_current_weight: voting_round.prevote_weight(),
prevote_ids: voting_round.prevote_ids().collect(),
precommit_current_weight: voting_round.precommit_weight(),
precommit_ids: voting_round.precommit_ids().collect(),
}
)
};
let lock = self.0.read();
let best_round = to_round_state(&lock.best_round);
let background_rounds = lock
.past_rounds
.voting_rounds()
.map(to_round_state)
.collect();
report::VoterState {
best_round,
background_rounds,
}
}
}
fn validate_catch_up<H, N, S, I, E>(
catch_up: CatchUp<H, N, S, I>,
env: &E,
voters: &VoterSet<I>,
best_round_number: u64,
) -> Option<crate::round::Round<I, H, N, S>> where
H: Clone + Eq + Ord + std::fmt::Debug,
N: BlockNumberOps + std::fmt::Debug,
S: Clone + Eq,
I: Clone + Eq + std::fmt::Debug + Ord,
E: Environment<H, N>,
{
if catch_up.round_number <= best_round_number {
trace!(target: "afg", "Ignoring because best round number is {}",
best_round_number);
return None;
}
{
let mut map = std::collections::BTreeMap::new();
for prevote in &catch_up.prevotes {
if !voters.contains(&prevote.id) {
trace!(target: "afg",
"Ignoring invalid catch up, invalid voter: {:?}",
prevote.id,
);
return None;
}
map.entry(prevote.id.clone()).or_insert((false, false)).0 = true;
}
for precommit in &catch_up.precommits {
if !voters.contains(&precommit.id) {
trace!(target: "afg",
"Ignoring invalid catch up, invalid voter: {:?}",
precommit.id,
);
return None;
}
map.entry(precommit.id.clone()).or_insert((false, false)).1 = true;
}
let (pv, pc) = map.into_iter().fold(
(VoteWeight(0), VoteWeight(0)),
|(mut pv, mut pc), (id, (prevoted, precommitted))| {
if let Some(v) = voters.get(&id) {
if prevoted {
pv = pv + v.weight();
}
if precommitted {
pc = pc + v.weight();
}
}
(pv, pc)
},
);
let threshold = voters.threshold();
if pv < threshold || pc < threshold {
trace!(target: "afg",
"Ignoring invalid catch up, missing voter threshold"
);
return None;
}
}
let mut round = crate::round::Round::new(crate::round::RoundParams {
round_number: catch_up.round_number,
voters: voters.clone(),
base: (catch_up.base_hash.clone(), catch_up.base_number),
});
for crate::SignedPrevote { prevote, id, signature } in catch_up.prevotes {
match round.import_prevote(env, prevote, id, signature) {
Ok(_) => {},
Err(e) => {
trace!(target: "afg",
"Ignoring invalid catch up, error importing prevote: {:?}",
e,
);
return None;
},
}
}
for crate::SignedPrecommit { precommit, id, signature } in catch_up.precommits {
match round.import_precommit(env, precommit, id, signature) {
Ok(_) => {},
Err(e) => {
trace!(target: "afg",
"Ignoring invalid catch up, error importing precommit: {:?}",
e,
);
return None;
},
}
}
let state = round.state();
if !state.completable {
return None;
}
Some(round)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SignedPrecommit;
use crate::testing::{
self,
chain::GENESIS_HASH,
environment::{Environment, Id, Signature},
};
use crate::weights::{VoteWeight, VoterWeight};
use futures::executor::LocalPool;
use futures::task::SpawnExt;
use futures_timer::Delay;
use std::iter;
use std::time::Duration;
use std::collections::HashSet;
#[test]
fn talking_to_myself() {
let local_id = Id(5);
let voters = VoterSet::new(std::iter::once((local_id, 100))).unwrap();
let (network, routing_task) = testing::environment::make_network();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters,
global_comms,
0,
Vec::new(),
last_finalized,
last_finalized,
);
let mut pool = LocalPool::new();
pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
pool.spawner().spawn(routing_task).unwrap();
pool.run_until(finalized
.take_while(|&(_, n, _)| future::ready(n < 6))
.for_each(|_| future::ready(())))
}
#[test]
fn finalizing_at_fault_threshold() {
let voters = VoterSet::new((0..10).map(|i| (Id(i), 1))).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let mut pool = LocalPool::new();
let finalized_streams = (0..7).map(|i| {
let local_id = Id(i);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
Vec::new(),
last_finalized,
last_finalized,
);
pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
finalized
.take_while(|&(_, n, _)| future::ready(n < 6))
.for_each(|_| future::ready(()))
}).collect::<Vec<_>>();
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
pool.run_until(future::join_all(finalized_streams.into_iter()));
}
#[test]
fn exposing_voter_state() {
let num_voters = 10;
let voters_online = 7;
let voters = VoterSet::new((0..num_voters).map(|i| (Id(i), 1))).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let mut pool = LocalPool::new();
let (finalized_streams, voter_states): (Vec<_>, Vec<_>) = (0..voters_online).map(|i| {
let local_id = Id(i);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let finalized = env.finalized_stream();
let voter = Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
Vec::new(),
last_finalized,
last_finalized,
);
let voter_state = voter.voter_state();
pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
(
finalized
.take_while(|&(_, n, _)| future::ready(n < 6))
.for_each(|_| future::ready(())),
voter_state,
)
}).unzip();
let voter_state = &voter_states[0];
voter_states.iter().all(|vs| vs.get() == voter_state.get());
let expected_round_state = report::RoundState::<Id> {
total_weight: VoterWeight::new(num_voters.into()).expect("nonzero"),
threshold_weight: VoterWeight::new(voters_online.into()).expect("nonzero"),
prevote_current_weight: VoteWeight(0),
prevote_ids: Default::default(),
precommit_current_weight: VoteWeight(0),
precommit_ids: Default::default(),
};
assert_eq!(
voter_state.get(),
report::VoterState {
background_rounds: Default::default(),
best_round: (1, expected_round_state.clone()),
}
);
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
pool.run_until(future::join_all(finalized_streams.into_iter()));
assert_eq!(voter_state.get().best_round, (2, expected_round_state.clone()));
}
#[test]
fn broadcast_commit() {
let local_id = Id(5);
let voters = VoterSet::new([(local_id, 100)].iter().cloned()).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let (commits, _) = network.make_global_comms();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
0,
Vec::new(),
last_finalized,
last_finalized,
);
let mut pool = LocalPool::new();
pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
pool.spawner().spawn(routing_task).unwrap();
pool.run_until(commits.take(1).for_each(|_| future::ready(())))
}
#[test]
fn broadcast_commit_only_if_newer() {
let local_id = Id(5);
let test_id = Id(42);
let voters = VoterSet::new([
(local_id, 100),
(test_id, 201),
].iter().cloned()).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let (commits_stream, commits_sink) = network.make_global_comms();
let (round_stream, round_sink) = network.make_round_comms(1, test_id);
let prevote = Message::Prevote(Prevote {
target_hash: "E",
target_number: 6,
});
let precommit = Message::Precommit(Precommit {
target_hash: "E",
target_number: 6,
});
let commit = (1, Commit {
target_hash: "E",
target_number: 6,
precommits: vec![SignedPrecommit {
precommit: Precommit { target_hash: "E", target_number: 6 },
signature: Signature(test_id.0),
id: test_id
}],
});
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
0,
Vec::new(),
last_finalized,
last_finalized,
);
let mut pool = LocalPool::new();
pool.spawner().spawn(voter.map(|v| v.expect("Error voting: {:?}"))).unwrap();
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
pool.spawner().spawn(
round_stream.into_future()
.then(|(value, stream)| {
assert!(match value {
Some(Ok(SignedMessage { message: Message::Prevote(_), id: Id(5), .. })) => true,
_ => false,
});
let votes = vec![prevote, precommit].into_iter().map(Result::Ok);
futures::stream::iter(votes).forward(round_sink).map(|_| stream)
})
.then(|stream| {
stream.take_while(|value| match value {
Ok(SignedMessage { message: Message::Precommit(_), id: Id(5), .. }) => future::ready(false),
_ => future::ready(true),
}).for_each(|_| future::ready(()))
})
.then(|_| {
stream::iter(iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink)
})
.map(|_| ())
).unwrap();
let res = pool.run_until(
commits_stream.into_future()
.then(|(_, stream)| {
let await_second = stream.take(1)
.for_each(|_| future::ready(()));
let delay = Delay::new(Duration::from_millis(500));
future::select(await_second, delay)
}));
match res {
future::Either::Right(((), _work)) => {
}
_ => panic!("Unexpected result")
}
}
#[test]
fn import_commit_for_any_round() {
let local_id = Id(5);
let test_id = Id(42);
let voters = VoterSet::new([
(local_id, 100),
(test_id, 201),
].iter().cloned()).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let (_, commits_sink) = network.make_global_comms();
let commit = Commit {
target_hash: "E",
target_number: 6,
precommits: vec![SignedPrecommit {
precommit: Precommit { target_hash: "E", target_number: 6 },
signature: Signature(test_id.0),
id: test_id
}],
};
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
global_comms,
1,
Vec::new(),
last_finalized,
last_finalized,
);
let mut pool = LocalPool::new();
pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
pool.spawner().spawn(
stream::iter(iter::once(Ok(CommunicationOut::Commit(0, commit.clone()))))
.forward(commits_sink)
.map(|_| ())).unwrap();
let finalized = pool.run_until(
env.finalized_stream()
.into_future()
.map(move |(msg, _)| msg.unwrap().2));
assert_eq!(finalized, commit);
}
#[test]
fn skips_to_latest_round_after_catch_up() {
let voters = VoterSet::new((0..3).map(|i| (Id(i), 1u64))).expect("nonempty");
let total_weight = voters.total_weight();
let threshold_weight = voters.threshold();
let voter_ids: HashSet<Id> = (0..3).map(|i| Id(i)).collect();
let (network, routing_task) = testing::environment::make_network();
let mut pool = LocalPool::new();
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
let (env, unsynced_voter) = {
let local_id = Id(4);
let env = Arc::new(Environment::new(network.clone(), local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters.clone(),
network.make_global_comms(),
0,
Vec::new(),
last_finalized,
last_finalized,
);
(env, voter)
};
let pv = |id| crate::SignedPrevote {
prevote: crate::Prevote { target_hash: "C", target_number: 4 },
id: Id(id),
signature: Signature(99),
};
let pc = |id| crate::SignedPrecommit {
precommit: crate::Precommit { target_hash: "C", target_number: 4 },
id: Id(id),
signature: Signature(99),
};
network.send_message(CommunicationIn::CatchUp(
CatchUp {
base_number: 1,
base_hash: GENESIS_HASH,
round_number: 5,
prevotes: vec![pv(0), pv(1), pv(2)],
precommits: vec![pc(0), pc(1), pc(2)],
},
Callback::Blank,
));
let voter_state = unsynced_voter.voter_state();
assert_eq!(voter_state.get().background_rounds.get(&5), None);
pool.spawner().spawn(unsynced_voter.map(|_| ())).unwrap();
let caught_up = future::poll_fn(|_| {
if voter_state.get().best_round.0 == 6 {
Poll::Ready(())
} else {
Poll::Pending
}
});
let finalized = env.finalized_stream().take(1).into_future();
pool.run_until(caught_up.then(|_| finalized.map(|_| ())));
assert_eq!(
voter_state.get().best_round,
(
6,
report::RoundState::<Id> {
total_weight,
threshold_weight,
prevote_current_weight: VoteWeight(0),
prevote_ids: Default::default(),
precommit_current_weight: VoteWeight(0),
precommit_ids: Default::default(),
}
)
);
assert_eq!(
voter_state.get().background_rounds.get(&5),
Some(&report::RoundState::<Id> {
total_weight,
threshold_weight,
prevote_current_weight: VoteWeight(3),
prevote_ids: voter_ids.clone(),
precommit_current_weight: VoteWeight(3),
precommit_ids: voter_ids,
})
);
}
#[test]
fn pick_up_from_prior_without_grandparent_state() {
let local_id = Id(5);
let voters = VoterSet::new(std::iter::once((local_id, 100))).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network, local_id));
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let voter = Voter::new(
env.clone(),
voters,
global_comms,
10,
Vec::new(),
last_finalized,
last_finalized,
);
let mut pool = LocalPool::new();
pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
pool.run_until(env.finalized_stream()
.take_while(|&(_, n, _)| future::ready(n < 6))
.for_each(|_| future::ready(())))
}
#[test]
fn pick_up_from_prior_with_grandparent_state() {
let local_id = Id(99);
let voters = VoterSet::new((0..100).map(|i| (Id(i), 1))).expect("nonempty");
let (network, routing_task) = testing::environment::make_network();
let global_comms = network.make_global_comms();
let env = Arc::new(Environment::new(network.clone(), local_id));
let outer_env = env.clone();
let last_finalized = env.with_chain(|chain| {
chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
chain.last_finalized()
});
let mut pool = LocalPool::new();
let mut last_round_votes = Vec::new();
for id in 0..67 {
let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 });
let precommit = if id < 66 {
Message::Precommit(Precommit { target_hash: "D", target_number: 5 })
} else {
Message::Precommit(Precommit { target_hash: "E", target_number: 6 })
};
last_round_votes.push(SignedMessage {
message: prevote.clone(),
signature: Signature(id),
id: Id(id),
});
last_round_votes.push(SignedMessage {
message: precommit.clone(),
signature: Signature(id),
id: Id(id),
});
let (_, round_sink) = network.make_round_comms(2, Id(id));
let msgs = stream::iter(iter::once(Ok(prevote)).chain(iter::once(Ok(precommit))));
pool.spawner().spawn(msgs.forward(round_sink).map(|r| r.unwrap())).unwrap();
}
let sender = Id(67);
let (_, round_sink) = network.make_round_comms(1, sender);
let last_precommit = Message::Precommit(Precommit { target_hash: "D", target_number: 3 });
pool.spawner().spawn(
stream::iter(iter::once(Ok(last_precommit)))
.forward(round_sink)
.map(|r| r.unwrap())).unwrap();
let voter = Voter::new(
env.clone(),
voters,
global_comms,
1,
last_round_votes,
last_finalized,
last_finalized,
);
pool.spawner().spawn(voter.map_err(|_| panic!("Error voting")).map(|_| ())).unwrap();
pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
let (round_stream, _) = network.make_round_comms(3, Id(1000));
pool.run_until(round_stream
.skip_while(move |v| {
let v = v.as_ref().unwrap();
if let Message::Prevote(_) = v.message {
future::ready(v.id != local_id)
} else {
future::ready(true)
}
})
.into_future()
.map(|_| ()));
assert_eq!(outer_env.last_completed_and_concluded(), (2, 1));
}
}