mod chain_full;
mod chain_light;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use futures::{future, StreamExt, TryStreamExt};
use log::warn;
use rpc::{
Result as RpcResult,
futures::{stream, Future, Sink, Stream},
};
use sc_client_api::{BlockchainEvents, light::{Fetcher, RemoteBlockchain}};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use sp_rpc::{number::NumberOrHex, list::ListOrValue};
use sp_runtime::{
generic::{BlockId, SignedBlock},
traits::{Block as BlockT, Header, NumberFor},
};
use self::error::{Result, Error, FutureResult};
pub use sc_rpc_api::chain::*;
use sp_blockchain::HeaderBackend;
use sc_client_api::BlockBackend;
trait ChainBackend<Client, Block: BlockT>: Send + Sync + 'static
where
Block: BlockT + 'static,
Client: HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
fn client(&self) -> &Arc<Client>;
fn subscriptions(&self) -> &SubscriptionManager;
fn unwrap_or_best(&self, hash: Option<Block::Hash>) -> Block::Hash {
match hash.into() {
None => self.client().info().best_hash,
Some(hash) => hash,
}
}
fn header(&self, hash: Option<Block::Hash>) -> FutureResult<Option<Block::Header>>;
fn block(&self, hash: Option<Block::Hash>) -> FutureResult<Option<SignedBlock<Block>>>;
fn block_hash(&self, number: Option<NumberOrHex>) -> Result<Option<Block::Hash>> {
match number {
None => Ok(Some(self.client().info().best_hash)),
Some(num_or_hex) => {
use std::convert::TryInto;
let block_num: u32 = num_or_hex.try_into().map_err(|_| {
Error::from(format!(
"`{:?}` > u32::max_value(), the max block number is u32.",
num_or_hex
))
})?;
let block_num = <NumberFor<Block>>::from(block_num);
Ok(self
.client()
.header(BlockId::number(block_num))
.map_err(client_err)?
.map(|h| h.hash()))
}
}
}
fn finalized_head(&self) -> Result<Block::Hash> {
Ok(self.client().info().finalized_hash)
}
fn subscribe_all_heads(
&self,
_metadata: crate::Metadata,
subscriber: Subscriber<Block::Header>,
) {
subscribe_headers(
self.client(),
self.subscriptions(),
subscriber,
|| self.client().info().best_hash,
|| self.client().import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.header))
.compat(),
)
}
fn unsubscribe_all_heads(
&self,
_metadata: Option<crate::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions().cancel(id))
}
fn subscribe_new_heads(
&self,
_metadata: crate::Metadata,
subscriber: Subscriber<Block::Header>,
) {
subscribe_headers(
self.client(),
self.subscriptions(),
subscriber,
|| self.client().info().best_hash,
|| self.client().import_notification_stream()
.filter(|notification| future::ready(notification.is_new_best))
.map(|notification| Ok::<_, ()>(notification.header))
.compat(),
)
}
fn unsubscribe_new_heads(
&self,
_metadata: Option<crate::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions().cancel(id))
}
fn subscribe_finalized_heads(
&self,
_metadata: crate::Metadata,
subscriber: Subscriber<Block::Header>,
) {
subscribe_headers(
self.client(),
self.subscriptions(),
subscriber,
|| self.client().info().finalized_hash,
|| self.client().finality_notification_stream()
.map(|notification| Ok::<_, ()>(notification.header))
.compat(),
)
}
fn unsubscribe_finalized_heads(
&self,
_metadata: Option<crate::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions().cancel(id))
}
}
pub fn new_full<Block: BlockT, Client>(
client: Arc<Client>,
subscriptions: SubscriptionManager,
) -> Chain<Block, Client>
where
Block: BlockT + 'static,
Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
Chain {
backend: Box::new(self::chain_full::FullChain::new(client, subscriptions)),
}
}
pub fn new_light<Block: BlockT, Client, F: Fetcher<Block>>(
client: Arc<Client>,
subscriptions: SubscriptionManager,
remote_blockchain: Arc<dyn RemoteBlockchain<Block>>,
fetcher: Arc<F>,
) -> Chain<Block, Client>
where
Block: BlockT + 'static,
Client: BlockBackend<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
F: Send + Sync + 'static,
{
Chain {
backend: Box::new(self::chain_light::LightChain::new(
client,
subscriptions,
remote_blockchain,
fetcher,
)),
}
}
pub struct Chain<Block: BlockT, Client> {
backend: Box<dyn ChainBackend<Client, Block>>,
}
impl<Block, Client> ChainApi<NumberFor<Block>, Block::Hash, Block::Header, SignedBlock<Block>> for
Chain<Block, Client>
where
Block: BlockT + 'static,
Client: HeaderBackend<Block> + BlockchainEvents<Block> + 'static,
{
type Metadata = crate::Metadata;
fn header(&self, hash: Option<Block::Hash>) -> FutureResult<Option<Block::Header>> {
self.backend.header(hash)
}
fn block(&self, hash: Option<Block::Hash>) -> FutureResult<Option<SignedBlock<Block>>>
{
self.backend.block(hash)
}
fn block_hash(
&self,
number: Option<ListOrValue<NumberOrHex>>,
) -> Result<ListOrValue<Option<Block::Hash>>> {
match number {
None => self.backend.block_hash(None).map(ListOrValue::Value),
Some(ListOrValue::Value(number)) => self.backend.block_hash(Some(number)).map(ListOrValue::Value),
Some(ListOrValue::List(list)) => Ok(ListOrValue::List(list
.into_iter()
.map(|number| self.backend.block_hash(Some(number)))
.collect::<Result<_>>()?
))
}
}
fn finalized_head(&self) -> Result<Block::Hash> {
self.backend.finalized_head()
}
fn subscribe_all_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
self.backend.subscribe_all_heads(metadata, subscriber)
}
fn unsubscribe_all_heads(&self, metadata: Option<Self::Metadata>, id: SubscriptionId) -> RpcResult<bool> {
self.backend.unsubscribe_all_heads(metadata, id)
}
fn subscribe_new_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
self.backend.subscribe_new_heads(metadata, subscriber)
}
fn unsubscribe_new_heads(&self, metadata: Option<Self::Metadata>, id: SubscriptionId) -> RpcResult<bool> {
self.backend.unsubscribe_new_heads(metadata, id)
}
fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber<Block::Header>) {
self.backend.subscribe_finalized_heads(metadata, subscriber)
}
fn unsubscribe_finalized_heads(&self, metadata: Option<Self::Metadata>, id: SubscriptionId) -> RpcResult<bool> {
self.backend.unsubscribe_finalized_heads(metadata, id)
}
}
fn subscribe_headers<Block, Client, F, G, S, ERR>(
client: &Arc<Client>,
subscriptions: &SubscriptionManager,
subscriber: Subscriber<Block::Header>,
best_block_hash: G,
stream: F,
) where
Block: BlockT + 'static,
Client: HeaderBackend<Block> + 'static,
F: FnOnce() -> S,
G: FnOnce() -> Block::Hash,
ERR: ::std::fmt::Debug,
S: Stream<Item=Block::Header, Error=ERR> + Send + 'static,
{
subscriptions.add(subscriber, |sink| {
let header = client.header(BlockId::Hash(best_block_hash()))
.map_err(client_err)
.and_then(|header| {
header.ok_or_else(|| "Best header missing.".to_owned().into())
})
.map_err(Into::into);
let stream = stream()
.map(|res| Ok(res))
.map_err(|e| warn!("Block notification stream error: {:?}", e));
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(
stream::iter_result(vec![Ok(header)])
.chain(stream)
)
.map(|_| ())
});
}
fn client_err(err: sp_blockchain::Error) -> Error {
Error::Client(Box::new(err))
}