#![warn(missing_docs)]
use std::{
fmt, marker::PhantomData, sync::Arc,
collections::HashSet,
};
use parking_lot::Mutex;
use threadpool::ThreadPool;
use sp_api::{ApiExt, ProvideRuntimeApi};
use futures::future::Future;
use log::{debug, warn};
use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId};
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext, traits::SpawnNamed};
use sp_runtime::{generic::BlockId, traits::{self, Header}};
use futures::{prelude::*, future::ready};
mod api;
use api::SharedClient;
pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
pub trait NetworkProvider: NetworkStateInfo {
fn set_authorized_peers(&self, peers: HashSet<PeerId>);
fn set_authorized_only(&self, reserved_only: bool);
}
impl<B, H> NetworkProvider for NetworkService<B, H>
where
B: traits::Block + 'static,
H: ExHashT,
{
fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
self.set_authorized_peers(peers)
}
fn set_authorized_only(&self, reserved_only: bool) {
self.set_authorized_only(reserved_only)
}
}
pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
client: Arc<Client>,
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
shared_client: SharedClient,
}
impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
pub fn new(client: Arc<Client>, db: Storage) -> Self {
let shared_client = SharedClient::new();
Self {
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
shared_client,
}
}
}
impl<Client, Storage, Block: traits::Block> fmt::Debug for OffchainWorkers<
Client,
Storage,
Block,
> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("OffchainWorkers").finish()
}
}
impl<Client, Storage, Block> OffchainWorkers<
Client,
Storage,
Block,
> where
Block: traits::Block,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: OffchainWorkerApi<Block>,
Storage: OffchainStorage + 'static,
{
#[must_use]
pub fn on_block_imported(
&self,
header: &Block::Header,
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
is_validator: bool,
) -> impl Future<Output = ()> {
let runtime = self.client.runtime_api();
let at = BlockId::hash(header.hash());
let has_api_v1 = runtime.has_api_with::<dyn OffchainWorkerApi<Block, Error = ()>, _>(
&at, |v| v == 1
);
let has_api_v2 = runtime.has_api_with::<dyn OffchainWorkerApi<Block, Error = ()>, _>(
&at, |v| v == 2
);
let version = match (has_api_v1, has_api_v2) {
(_, Ok(true)) => 2,
(Ok(true), _) => 1,
err => {
let help = "Consider turning off offchain workers if they are not part of your runtime.";
log::error!("Unsupported Offchain Worker API version: {:?}. {}.", err, help);
0
}
};
debug!("Checking offchain workers at {:?}: version:{}", at, version);
if version > 0 {
let (api, runner) = api::AsyncApi::new(
self.db.clone(),
network_provider,
is_validator,
self.shared_client.clone(),
);
debug!("Spawning offchain workers at {:?}", at);
let header = header.clone();
let client = self.client.clone();
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
let context = ExecutionContext::OffchainCall(Some(
(api, offchain::Capabilities::all())
));
let run = if version == 2 {
runtime.offchain_worker_with_context(&at, context, &header)
} else {
#[allow(deprecated)]
runtime.offchain_worker_before_version_2_with_context(
&at, context, *header.number()
)
};
if let Err(e) = run {
log::error!("Error running offchain workers at {:?}: {:?}", at, e);
}
});
futures::future::Either::Left(runner.process())
} else {
futures::future::Either::Right(futures::future::ready(()))
}
}
fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
self.thread_pool.lock().execute(f);
}
}
pub async fn notification_future<Client, Storage, Block, Spawner>(
is_validator: bool,
client: Arc<Client>,
offchain: Arc<OffchainWorkers<Client, Storage, Block>>,
spawner: Spawner,
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
)
where
Block: traits::Block,
Client: ProvideRuntimeApi<Block> + sc_client_api::BlockchainEvents<Block> + Send + Sync + 'static,
Client::Api: OffchainWorkerApi<Block>,
Storage: OffchainStorage + 'static,
Spawner: SpawnNamed
{
client.import_notification_stream().for_each(move |n| {
if n.is_new_best {
spawner.spawn(
"offchain-on-block",
offchain.on_block_imported(
&n.header,
network_provider.clone(),
is_validator,
).boxed(),
);
} else {
log::debug!(
target: "sc_offchain",
"Skipping offchain workers for non-canon block: {:?}",
n.header,
)
}
ready(())
}).await;
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use sc_network::{Multiaddr, PeerId};
use substrate_test_runtime_client::{TestClient, runtime::Block};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sp_transaction_pool::{TransactionPool, InPoolTransaction};
struct TestNetwork();
impl NetworkStateInfo for TestNetwork {
fn external_addresses(&self) -> Vec<Multiaddr> {
Vec::new()
}
fn local_peer_id(&self) -> PeerId {
PeerId::random()
}
}
impl NetworkProvider for TestNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
unimplemented!()
}
fn set_authorized_only(&self, _reserved_only: bool) {
unimplemented!()
}
}
struct TestPool(
Arc<BasicPool<FullChainApi<TestClient, Block>, Block>>
);
impl sp_transaction_pool::OffchainSubmitTransaction<Block> for TestPool {
fn submit_at(
&self,
at: &BlockId<Block>,
extrinsic: <Block as traits::Block>::Extrinsic,
) -> Result<(), ()> {
let source = sp_transaction_pool::TransactionSource::Local;
futures::executor::block_on(self.0.submit_one(&at, source, extrinsic))
.map(|_| ())
.map_err(|_| ())
}
}
#[test]
fn should_call_into_runtime_and_produce_extrinsic() {
sp_tracing::try_init_simple();
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let pool = TestPool(BasicPool::new_full(
Default::default(),
None,
spawner,
client.clone(),
));
let db = sc_client_db::offchain::LocalStorage::new_test();
let network = Arc::new(TestNetwork());
let header = client.header(&BlockId::number(0)).unwrap().unwrap();
let offchain = OffchainWorkers::new(client, db);
futures::executor::block_on(
offchain.on_block_imported(&header, network, false)
);
assert_eq!(pool.0.status().ready, 1);
assert_eq!(pool.0.ready().next().unwrap().is_propagable(), false);
}
}