#![warn(missing_docs)]
use futures::{
prelude::*,
StreamExt,
};
use log::{debug, error, info};
use sc_client_api::{
backend::{AuxStore, Backend},
LockImportRun, BlockchainEvents, CallExecutor,
ExecutionStrategy, Finalizer, TransactionFor, ExecutorProvider,
};
use parity_scale_codec::{Decode, Encode};
use prometheus_endpoint::{PrometheusError, Registry};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, Error as ClientError, HeaderMetadata};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{NumberFor, Block as BlockT, DigestFor, Zero};
use sp_inherents::InherentDataProviders;
use sp_consensus::{SelectChain, BlockImport};
use sp_core::{
crypto::Public,
traits::BareCryptoStorePtr,
};
use sp_application_crypto::AppKey;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG};
use parking_lot::RwLock;
use finality_grandpa::Error as GrandpaError;
use finality_grandpa::{voter, voter_set::VoterSet};
pub use finality_grandpa::BlockNumberOps;
use std::{fmt, io};
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use std::task::{Poll, Context};
macro_rules! afg_log {
($condition:expr, $($msg: expr),+ $(,)?) => {
{
let log_level = if $condition {
log::Level::Debug
} else {
log::Level::Info
};
log::log!(target: "afg", log_level, $($msg),+);
}
};
}
mod authorities;
mod aux_schema;
mod communication;
mod consensus_changes;
mod environment;
mod finality_proof;
mod import;
mod justification;
mod light_import;
mod notification;
mod observer;
mod until_imported;
mod voting_rule;
pub use authorities::SharedAuthoritySet;
pub use finality_proof::{FinalityProofFragment, FinalityProofProvider, StorageAndProofProvider};
pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
pub use import::GrandpaBlockImport;
pub use justification::GrandpaJustification;
pub use light_import::{light_block_import, GrandpaLightBlockImport};
pub use voting_rule::{
BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRulesBuilder
};
pub use finality_grandpa::voter::report;
use aux_schema::PersistentData;
use environment::{Environment, VoterSetState};
use until_imported::UntilGlobalMessageBlocksImported;
use communication::{NetworkBridge, Network as NetworkT};
use sp_finality_grandpa::{AuthorityList, AuthoritySignature, SetId};
pub use sp_finality_grandpa::{AuthorityId, AuthorityPair, GrandpaApi, ScheduledChange};
use std::marker::PhantomData;
#[cfg(test)]
mod tests;
pub type Message<Block> = finality_grandpa::Message<<Block as BlockT>::Hash, NumberFor<Block>>;
pub type SignedMessage<Block> = finality_grandpa::SignedMessage<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
pub type PrimaryPropose<Block> = finality_grandpa::PrimaryPropose<<Block as BlockT>::Hash, NumberFor<Block>>;
pub type Prevote<Block> = finality_grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Block>>;
pub type Precommit<Block> = finality_grandpa::Precommit<<Block as BlockT>::Hash, NumberFor<Block>>;
pub type CatchUp<Block> = finality_grandpa::CatchUp<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
pub type Commit<Block> = finality_grandpa::Commit<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
pub type CompactCommit<Block> = finality_grandpa::CompactCommit<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
type CommunicationIn<Block> = finality_grandpa::voter::CommunicationIn<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
type CommunicationInH<Block, H> = finality_grandpa::voter::CommunicationIn<
H,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
type CommunicationOutH<Block, H> = finality_grandpa::voter::CommunicationOut<
H,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
pub struct SharedVoterState {
inner: Arc<RwLock<Option<Box<dyn voter::VoterState<AuthorityId> + Sync + Send>>>>,
}
impl SharedVoterState {
pub fn empty() -> Self {
Self {
inner: Arc::new(RwLock::new(None)),
}
}
fn reset(
&self,
voter_state: Box<dyn voter::VoterState<AuthorityId> + Sync + Send>,
) -> Option<()> {
let mut shared_voter_state = self
.inner
.try_write_for(Duration::from_secs(1))?;
*shared_voter_state = Some(voter_state);
Some(())
}
pub fn voter_state(&self) -> Option<voter::report::VoterState<AuthorityId>> {
self.inner.read().as_ref().map(|vs| vs.get())
}
}
impl Clone for SharedVoterState {
fn clone(&self) -> Self {
SharedVoterState { inner: self.inner.clone() }
}
}
#[derive(Clone)]
pub struct Config {
pub gossip_duration: Duration,
pub justification_period: u32,
pub observer_enabled: bool,
pub is_authority: bool,
pub name: Option<String>,
pub keystore: Option<BareCryptoStorePtr>,
}
impl Config {
fn name(&self) -> &str {
self.name.as_ref().map(|s| s.as_str()).unwrap_or("<unknown>")
}
}
#[derive(Debug)]
pub enum Error {
Grandpa(GrandpaError),
Network(String),
Blockchain(String),
Client(ClientError),
Signing(String),
Safety(String),
Timer(io::Error),
}
impl From<GrandpaError> for Error {
fn from(e: GrandpaError) -> Self {
Error::Grandpa(e)
}
}
impl From<ClientError> for Error {
fn from(e: ClientError) -> Self {
Error::Client(e)
}
}
pub(crate) trait BlockStatus<Block: BlockT> {
fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
}
impl<Block: BlockT, Client> BlockStatus<Block> for Arc<Client> where
Client: HeaderBackend<Block>,
NumberFor<Block>: BlockNumberOps,
{
fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
self.block_number_from_id(&BlockId::Hash(hash))
.map_err(|e| Error::Blockchain(format!("{:?}", e)))
}
}
pub trait ClientForGrandpa<Block, BE>:
LockImportRun<Block, BE> + Finalizer<Block, BE> + AuxStore
+ HeaderMetadata<Block, Error = sp_blockchain::Error> + HeaderBackend<Block>
+ BlockchainEvents<Block> + ProvideRuntimeApi<Block> + ExecutorProvider<Block>
+ BlockImport<Block, Transaction = TransactionFor<BE, Block>, Error = sp_consensus::Error>
where
BE: Backend<Block>,
Block: BlockT,
{}
impl<Block, BE, T> ClientForGrandpa<Block, BE> for T
where
BE: Backend<Block>,
Block: BlockT,
T: LockImportRun<Block, BE> + Finalizer<Block, BE> + AuxStore
+ HeaderMetadata<Block, Error = sp_blockchain::Error> + HeaderBackend<Block>
+ BlockchainEvents<Block> + ProvideRuntimeApi<Block> + ExecutorProvider<Block>
+ BlockImport<Block, Transaction = TransactionFor<BE, Block>, Error = sp_consensus::Error>,
{}
pub(crate) trait BlockSyncRequester<Block: BlockT> {
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}
impl<Block, Network> BlockSyncRequester<Block> for NetworkBridge<Block, Network> where
Block: BlockT,
Network: NetworkT<Block>,
{
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>) {
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
}
}
#[derive(Debug)]
pub(crate) struct NewAuthoritySet<H, N> {
pub(crate) canon_number: N,
pub(crate) canon_hash: H,
pub(crate) set_id: SetId,
pub(crate) authorities: AuthorityList,
}
#[derive(Debug)]
pub(crate) enum VoterCommand<H, N> {
Pause(String),
ChangeAuthorities(NewAuthoritySet<H, N>)
}
impl<H, N> fmt::Display for VoterCommand<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
}
}
}
#[derive(Debug)]
pub(crate) enum CommandOrError<H, N> {
Error(Error),
VoterCommand(VoterCommand<H, N>),
}
impl<H, N> From<Error> for CommandOrError<H, N> {
fn from(e: Error) -> Self {
CommandOrError::Error(e)
}
}
impl<H, N> From<ClientError> for CommandOrError<H, N> {
fn from(e: ClientError) -> Self {
CommandOrError::Error(Error::Client(e))
}
}
impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
fn from(e: finality_grandpa::Error) -> Self {
CommandOrError::Error(Error::from(e))
}
}
impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
fn from(e: VoterCommand<H, N>) -> Self {
CommandOrError::VoterCommand(e)
}
}
impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> { }
impl<H, N> fmt::Display for CommandOrError<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
CommandOrError::Error(ref e) => write!(f, "{:?}", e),
CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
}
}
}
pub struct LinkHalf<Block: BlockT, C, SC> {
client: Arc<C>,
select_chain: SC,
persistent_data: PersistentData<Block>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
justification_sender: GrandpaJustificationSender<Block>,
justification_stream: GrandpaJustificationStream<Block>,
}
impl<Block: BlockT, C, SC> LinkHalf<Block, C, SC> {
pub fn shared_authority_set(&self) -> &SharedAuthoritySet<Block::Hash, NumberFor<Block>> {
&self.persistent_data.authority_set
}
pub fn justification_stream(&self) -> GrandpaJustificationStream<Block> {
self.justification_stream.clone()
}
}
pub trait GenesisAuthoritySetProvider<Block: BlockT> {
fn get(&self) -> Result<AuthorityList, ClientError>;
}
impl<Block: BlockT, E> GenesisAuthoritySetProvider<Block> for Arc<dyn ExecutorProvider<Block, Executor = E>>
where E: CallExecutor<Block>,
{
fn get(&self) -> Result<AuthorityList, ClientError> {
self.executor()
.call(
&BlockId::Number(Zero::zero()),
"GrandpaApi_grandpa_authorities",
&[],
ExecutionStrategy::NativeElseWasm,
None,
)
.and_then(|call_result| {
Decode::decode(&mut &call_result[..])
.map_err(|err| ClientError::CallResultDecode(
"failed to decode GRANDPA authorities set proof", err
))
})
}
}
pub fn block_import<BE, Block: BlockT, Client, SC>(
client: Arc<Client>,
genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
select_chain: SC,
) -> Result<
(
GrandpaBlockImport<BE, Block, Client, SC>,
LinkHalf<Block, Client, SC>,
),
ClientError,
>
where
SC: SelectChain<Block>,
BE: Backend<Block> + 'static,
Client: ClientForGrandpa<Block, BE> + 'static,
{
block_import_with_authority_set_hard_forks(
client,
genesis_authorities_provider,
select_chain,
Default::default(),
)
}
pub fn block_import_with_authority_set_hard_forks<BE, Block: BlockT, Client, SC>(
client: Arc<Client>,
genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
select_chain: SC,
authority_set_hard_forks: Vec<(SetId, (Block::Hash, NumberFor<Block>), AuthorityList)>,
) -> Result<
(
GrandpaBlockImport<BE, Block, Client, SC>,
LinkHalf<Block, Client, SC>,
),
ClientError,
>
where
SC: SelectChain<Block>,
BE: Backend<Block> + 'static,
Client: ClientForGrandpa<Block, BE> + 'static,
{
let chain_info = client.info();
let genesis_hash = chain_info.genesis_hash;
let persistent_data = aux_schema::load_persistent(
&*client,
genesis_hash,
<NumberFor<Block>>::zero(),
|| {
let authorities = genesis_authorities_provider.get()?;
telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities";
"authorities_len" => ?authorities.len()
);
Ok(authorities)
}
)?;
let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command");
let (justification_sender, justification_stream) =
GrandpaJustificationStream::channel();
let authority_set_hard_forks = authority_set_hard_forks
.into_iter()
.map(|(set_id, (hash, number), authorities)| {
(
set_id,
authorities::PendingChange {
next_authorities: authorities,
delay: Zero::zero(),
canon_hash: hash,
canon_height: number,
delay_kind: authorities::DelayKind::Finalized,
},
)
})
.collect();
Ok((
GrandpaBlockImport::new(
client.clone(),
select_chain.clone(),
persistent_data.authority_set.clone(),
voter_commands_tx,
persistent_data.consensus_changes.clone(),
authority_set_hard_forks,
justification_sender.clone(),
),
LinkHalf {
client,
select_chain,
persistent_data,
voter_commands_rx,
justification_sender,
justification_stream,
},
))
}
fn global_communication<BE, Block: BlockT, C, N>(
set_id: SetId,
voters: &Arc<VoterSet<AuthorityId>>,
client: Arc<C>,
network: &NetworkBridge<Block, N>,
keystore: Option<&BareCryptoStorePtr>,
metrics: Option<until_imported::Metrics>,
) -> (
impl Stream<
Item = Result<CommunicationInH<Block, Block::Hash>, CommandOrError<Block::Hash, NumberFor<Block>>>,
>,
impl Sink<
CommunicationOutH<Block, Block::Hash>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
> + Unpin,
) where
BE: Backend<Block> + 'static,
C: ClientForGrandpa<Block, BE> + 'static,
N: NetworkT<Block>,
NumberFor<Block>: BlockNumberOps,
{
let is_voter = is_voter(voters, keystore).is_some();
let (global_in, global_out) = network.global_communication(
communication::SetId(set_id),
voters.clone(),
is_voter,
);
let global_in = UntilGlobalMessageBlocksImported::new(
client.import_notification_stream(),
network.clone(),
client.clone(),
global_in,
"global",
metrics,
);
let global_in = global_in.map_err(CommandOrError::from);
let global_out = global_out.sink_map_err(CommandOrError::from);
(global_in, global_out)
}
fn register_finality_tracker_inherent_data_provider<Block: BlockT, Client>(
client: Arc<Client>,
inherent_data_providers: &InherentDataProviders,
) -> Result<(), sp_consensus::Error> where
Client: HeaderBackend<Block> + 'static,
{
if !inherent_data_providers.has_provider(&sp_finality_tracker::INHERENT_IDENTIFIER) {
inherent_data_providers
.register_provider(sp_finality_tracker::InherentDataProvider::new(move || {
#[allow(deprecated)]
{
let info = client.info();
telemetry!(CONSENSUS_INFO; "afg.finalized";
"finalized_number" => ?info.finalized_number,
"finalized_hash" => ?info.finalized_hash,
);
Ok(info.finalized_number)
}
}))
.map_err(|err| sp_consensus::Error::InherentData(err))
} else {
Ok(())
}
}
pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
pub config: Config,
pub link: LinkHalf<Block, C, SC>,
pub network: N,
pub inherent_data_providers: InherentDataProviders,
pub telemetry_on_connect: Option<TracingUnboundedReceiver<()>>,
pub voting_rule: VR,
pub prometheus_registry: Option<prometheus_endpoint::Registry>,
pub shared_voter_state: SharedVoterState,
}
pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, SC, VR>(
grandpa_params: GrandpaParams<Block, C, N, SC, VR>,
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
Block::Hash: Ord,
BE: Backend<Block> + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
C: ClientForGrandpa<Block, BE> + 'static,
C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>,
{
let GrandpaParams {
mut config,
link,
network,
inherent_data_providers,
telemetry_on_connect,
voting_rule,
prometheus_registry,
shared_voter_state,
} = grandpa_params;
config.observer_enabled = false;
let LinkHalf {
client,
select_chain,
persistent_data,
voter_commands_rx,
justification_sender,
justification_stream: _,
} = link;
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
prometheus_registry.as_ref(),
);
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
let conf = config.clone();
let telemetry_task = if let Some(telemetry_on_connect) = telemetry_on_connect {
let authorities = persistent_data.authority_set.clone();
let events = telemetry_on_connect
.for_each(move |_| {
let curr = authorities.current_authorities();
let mut auths = curr.iter().map(|(p, _)| p);
let maybe_authority_id = authority_id(&mut auths, conf.keystore.as_ref())
.unwrap_or_default();
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"authority_id" => maybe_authority_id.to_string(),
"authority_set_id" => ?authorities.set_id(),
"authorities" => {
let authorities: Vec<String> = curr.iter()
.map(|(id, _)| id.to_string()).collect();
serde_json::to_string(&authorities)
.expect("authorities is always at least an empty vector; elements are always of type string")
}
);
future::ready(())
});
future::Either::Left(events)
} else {
future::Either::Right(future::pending())
};
let voter_work = VoterWork::new(
client,
config,
network,
select_chain,
voting_rule,
persistent_data,
voter_commands_rx,
prometheus_registry,
shared_voter_state,
justification_sender,
);
let voter_work = voter_work.map(|res| match res {
Ok(()) => error!(target: "afg",
"GRANDPA voter future has concluded naturally, this should be unreachable."
),
Err(e) => error!(target: "afg", "GRANDPA voter error: {:?}", e),
});
let telemetry_task = telemetry_task
.then(|_| future::pending::<()>());
Ok(future::select(voter_work, telemetry_task).map(drop))
}
struct Metrics {
environment: environment::Metrics,
until_imported: until_imported::Metrics,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
environment: environment::Metrics::register(registry)?,
until_imported: until_imported::Metrics::register(registry)?,
})
}
}
#[must_use]
struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, SC, VR> {
voter: Pin<Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>>,
shared_voter_state: SharedVoterState,
env: Arc<Environment<B, Block, C, N, SC, VR>>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: NetworkBridge<Block, N>,
metrics: Option<Metrics>,
}
impl<B, Block, C, N, SC, VR> VoterWork<B, Block, C, N, SC, VR>
where
Block: BlockT,
B: Backend<Block> + 'static,
C: ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C> + Clone + 'static,
{
fn new(
client: Arc<C>,
config: Config,
network: NetworkBridge<Block, N>,
select_chain: SC,
voting_rule: VR,
persistent_data: PersistentData<Block>,
voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
prometheus_registry: Option<prometheus_endpoint::Registry>,
shared_voter_state: SharedVoterState,
justification_sender: GrandpaJustificationSender<Block>,
) -> Self {
let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
Some(Ok(metrics)) => Some(metrics),
Some(Err(e)) => {
debug!(target: "afg", "Failed to register metrics: {:?}", e);
None
}
None => None,
};
let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
client,
select_chain,
voting_rule,
voters: Arc::new(voters),
config,
network: network.clone(),
set_id: persistent_data.authority_set.set_id(),
authority_set: persistent_data.authority_set.clone(),
consensus_changes: persistent_data.consensus_changes.clone(),
voter_set_state: persistent_data.set_state,
metrics: metrics.as_ref().map(|m| m.environment.clone()),
justification_sender: Some(justification_sender),
_phantom: PhantomData,
});
let mut work = VoterWork {
voter: Box::pin(future::pending()),
shared_voter_state,
env,
voter_commands_rx,
network,
metrics,
};
work.rebuild_voter();
work
}
fn rebuild_voter(&mut self) {
debug!(target: "afg", "{}: Starting new voter with set ID {}", self.env.config.name(), self.env.set_id);
let authority_id = is_voter(&self.env.voters, self.env.config.keystore.as_ref())
.unwrap_or_default();
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
"name" => ?self.env.config.name(),
"set_id" => ?self.env.set_id,
"authority_id" => authority_id.to_string(),
);
let chain_info = self.env.client.info();
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"number" => ?chain_info.finalized_number,
"hash" => ?chain_info.finalized_hash,
"authority_id" => authority_id.to_string(),
"authority_set_id" => ?self.env.set_id,
"authorities" => {
let authorities: Vec<String> = self.env.voters
.iter().map(|(id, _)| id.to_string()).collect();
serde_json::to_string(&authorities)
.expect("authorities is always at least an empty vector; elements are always of type string")
},
);
match &*self.env.voter_set_state.read() {
VoterSetState::Live { completed_rounds, .. } => {
let last_finalized = (
chain_info.finalized_hash,
chain_info.finalized_number,
);
let global_comms = global_communication(
self.env.set_id,
&self.env.voters,
self.env.client.clone(),
&self.env.network,
self.env.config.keystore.as_ref(),
self.metrics.as_ref().map(|m| m.until_imported.clone()),
);
let last_completed_round = completed_rounds.last();
let voter = voter::Voter::new(
self.env.clone(),
(*self.env.voters).clone(),
global_comms,
last_completed_round.number,
last_completed_round.votes.clone(),
last_completed_round.base,
last_finalized,
);
if self.shared_voter_state.reset(voter.voter_state()).is_none() {
info!(target: "afg",
"Timed out trying to update shared GRANDPA voter state. \
RPC endpoints may return stale data."
);
}
self.voter = Box::pin(voter);
},
VoterSetState::Paused { .. } =>
self.voter = Box::pin(future::pending()),
};
}
fn handle_voter_command(
&mut self,
command: VoterCommand<Block::Hash, NumberFor<Block>>
) -> Result<(), Error> {
match command {
VoterCommand::ChangeAuthorities(new) => {
let voters: Vec<String> = new.authorities.iter().map(move |(a, _)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities";
"number" => ?new.canon_number,
"hash" => ?new.canon_hash,
"voters" => ?voters,
"set_id" => ?new.set_id,
);
self.env.update_voter_set_state(|_| {
let set_state = VoterSetState::live(
new.set_id,
&*self.env.authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
let voters = Arc::new(VoterSet::new(new.authorities.into_iter())
.expect("new authorities come from pending change; \
pending change comes from `AuthoritySet`; \
`AuthoritySet` validates authorities is non-empty and weights are non-zero; \
qed."
)
);
self.env = Arc::new(Environment {
voters,
set_id: new.set_id,
voter_set_state: self.env.voter_set_state.clone(),
client: self.env.client.clone(),
select_chain: self.env.select_chain.clone(),
config: self.env.config.clone(),
authority_set: self.env.authority_set.clone(),
consensus_changes: self.env.consensus_changes.clone(),
network: self.env.network.clone(),
voting_rule: self.env.voting_rule.clone(),
metrics: self.env.metrics.clone(),
justification_sender: self.env.justification_sender.clone(),
_phantom: PhantomData,
});
self.rebuild_voter();
Ok(())
}
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
self.env.update_voter_set_state(|voter_set_state| {
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
self.rebuild_voter();
Ok(())
}
}
}
}
impl<B, Block, C, N, SC, VR> Future for VoterWork<B, Block, C, N, SC, VR>
where
Block: BlockT,
B: Backend<Block> + 'static,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
SC: SelectChain<Block> + 'static,
C: ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>,
VR: VotingRule<Block, C> + Clone + 'static,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.voter), cx) {
Poll::Pending => {}
Poll::Ready(Ok(())) => {
return Poll::Ready(
Err(Error::Safety("finality-grandpa inner voter has concluded.".into()))
)
}
Poll::Ready(Err(CommandOrError::Error(e))) => {
return Poll::Ready(Err(e))
}
Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
}
}
match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
Poll::Pending => {}
Poll::Ready(None) => {
return Poll::Ready(
Err(Error::Safety("`voter_commands_rx` was closed.".into()))
)
}
Poll::Ready(Some(command)) => {
self.handle_voter_command(command)?;
cx.waker().wake_by_ref();
}
}
Future::poll(Pin::new(&mut self.network), cx)
}
}
pub fn setup_disabled_grandpa<Block: BlockT, Client, N>(
client: Arc<Client>,
inherent_data_providers: &InherentDataProviders,
network: N,
) -> Result<(), sp_consensus::Error> where
N: NetworkT<Block> + Send + Clone + 'static,
Client: HeaderBackend<Block> + 'static,
{
register_finality_tracker_inherent_data_provider(
client,
inherent_data_providers,
)?;
network.register_notifications_protocol(
communication::GRANDPA_ENGINE_ID,
From::from(communication::GRANDPA_PROTOCOL_NAME),
);
Ok(())
}
fn is_voter(
voters: &Arc<VoterSet<AuthorityId>>,
keystore: Option<&BareCryptoStorePtr>,
) -> Option<AuthorityId> {
match keystore {
Some(keystore) => voters
.iter()
.find(|(p, _)| {
keystore.read()
.has_keys(&[(p.to_raw_vec(), AuthorityId::ID)])
})
.map(|(p, _)| p.clone()),
None => None,
}
}
fn authority_id<'a, I>(
authorities: &mut I,
keystore: Option<&BareCryptoStorePtr>,
) -> Option<AuthorityId> where
I: Iterator<Item = &'a AuthorityId>,
{
match keystore {
Some(keystore) => {
authorities
.find(|p| keystore.read().has_keys(&[(p.to_raw_vec(), AuthorityId::ID)]))
.cloned()
},
None => None,
}
}