use crate::{
ExHashT, NetworkStateInfo, NetworkStatus,
behaviour::{self, Behaviour, BehaviourOut},
config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent,
discovery::DiscoveryConfig,
error::Error,
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
on_demand_layer::AlwaysBadChecker,
light_client_handler, block_requests, finality_requests,
protocol::{self, event::Event, NotifsHandlerError, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol},
transport, ReputationChange,
};
use futures::{channel::oneshot, prelude::*};
use libp2p::{PeerId, multiaddr, Multiaddr};
use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError};
use libp2p::kad::record;
use libp2p::ping::handler::PingFailure;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handler::NodeHandlerWrapperError};
use log::{error, info, trace, warn};
use metrics::{Metrics, MetricSources, Histogram, HistogramVec};
use parking_lot::Mutex;
use sc_peerset::PeersetHandle;
use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link};
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
ConsensusEngineId,
};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{
borrow::{Borrow, Cow},
collections::{HashMap, HashSet},
fs,
marker::PhantomData,
num:: NonZeroUsize,
pin::Pin,
str,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
task::Poll,
};
use wasm_timer::Instant;
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
mod metrics;
mod out_events;
#[cfg(test)]
mod tests;
pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
num_connected: Arc<AtomicUsize>,
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
is_major_syncing: Arc<AtomicBool>,
local_peer_id: PeerId,
bandwidth: Arc<transport::BandwidthSinks>,
peerset: PeersetHandle,
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B, H>>,
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
protocol_name_by_engine: Mutex<HashMap<ConsensusEngineId, Cow<'static, str>>>,
notifications_sizes_metric: Option<HistogramVec>,
_marker: PhantomData<H>,
}
impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
pub fn new(params: Params<B, H>) -> Result<NetworkWorker<B, H>, Error> {
ensure_addresses_consistent_with_transport(
params.network_config.listen_addresses.iter(),
¶ms.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params.network_config.boot_nodes.iter().map(|x| &x.multiaddr),
¶ms.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params.network_config.reserved_nodes.iter().map(|x| &x.multiaddr),
¶ms.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params.network_config.public_addresses.iter(),
¶ms.network_config.transport,
)?;
let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker");
if let Some(path) = params.network_config.net_config_path {
fs::create_dir_all(&path)?;
}
let mut known_addresses = Vec::new();
let mut bootnodes = Vec::new();
let mut boot_node_ids = HashSet::new();
for bootnode in params.network_config.boot_nodes.iter() {
bootnodes.push(bootnode.peer_id.clone());
boot_node_ids.insert(bootnode.peer_id.clone());
known_addresses.push((bootnode.peer_id.clone(), bootnode.multiaddr.clone()));
}
let boot_node_ids = Arc::new(boot_node_ids);
known_addresses.iter()
.try_for_each(|(peer_id, addr)|
if let Some(other) = known_addresses
.iter()
.find(|o| o.1 == *addr && o.0 != *peer_id)
{
Err(Error::DuplicateBootnode {
address: addr.clone(),
first_id: peer_id.clone(),
second_id: other.0.clone(),
})
} else {
Ok(())
}
)?;
let priority_groups = {
let mut reserved_nodes = HashSet::new();
for reserved in params.network_config.reserved_nodes.iter() {
reserved_nodes.insert(reserved.peer_id.clone());
known_addresses.push((reserved.peer_id.clone(), reserved.multiaddr.clone()));
}
let print_deprecated_message = match ¶ms.role {
Role::Sentry { .. } => true,
Role::Authority { sentry_nodes } if !sentry_nodes.is_empty() => true,
_ => false,
};
if print_deprecated_message {
log::warn!(
"🙇 Sentry nodes are deprecated, and the `--sentry` and `--sentry-nodes` \
CLI options will eventually be removed in a future version. The Substrate \
and Polkadot networking protocol require validators to be \
publicly-accessible. Please do not block access to your validator nodes. \
For details, see https://github.com/paritytech/substrate/issues/6845."
);
}
let mut sentries_and_validators = HashSet::new();
match ¶ms.role {
Role::Sentry { validators } => {
for validator in validators {
sentries_and_validators.insert(validator.peer_id.clone());
reserved_nodes.insert(validator.peer_id.clone());
known_addresses.push((validator.peer_id.clone(), validator.multiaddr.clone()));
}
}
Role::Authority { sentry_nodes } => {
for sentry_node in sentry_nodes {
sentries_and_validators.insert(sentry_node.peer_id.clone());
reserved_nodes.insert(sentry_node.peer_id.clone());
known_addresses.push((sentry_node.peer_id.clone(), sentry_node.multiaddr.clone()));
}
}
_ => {}
}
vec![
("reserved".to_owned(), reserved_nodes),
("sentries_and_validators".to_owned(), sentries_and_validators),
]
};
let peerset_config = sc_peerset::PeersetConfig {
in_peers: params.network_config.in_peers,
out_peers: params.network_config.out_peers,
bootnodes,
reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny,
priority_groups,
};
let local_identity = params.network_config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
let local_peer_id = local_public.clone().into_peer_id();
let local_peer_id_legacy = bs58::encode(Borrow::<[u8]>::borrow(&local_peer_id)).into_string();
info!(
target: "sub-libp2p",
"🏷 Local node identity is: {} (legacy representation: {})",
local_peer_id.to_base58(),
local_peer_id_legacy
);
let checker = params.on_demand.as_ref()
.map(|od| od.checker().clone())
.unwrap_or_else(|| Arc::new(AlwaysBadChecker));
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let (protocol, peerset_handle) = Protocol::new(
protocol::ProtocolConfig {
roles: From::from(¶ms.role),
max_parallel_downloads: params.network_config.max_parallel_downloads,
},
local_peer_id.clone(),
params.chain.clone(),
params.transaction_pool,
params.finality_proof_request_builder,
params.protocol_id.clone(),
peerset_config,
params.block_announce_validator,
params.metrics_registry.as_ref(),
boot_node_ids.clone(),
)?;
let (mut swarm, bandwidth): (Swarm<B, H>, _) = {
let user_agent = format!(
"{} ({})",
params.network_config.client_version,
params.network_config.node_name
);
let block_requests = {
let config = block_requests::Config::new(¶ms.protocol_id);
block_requests::BlockRequests::new(config, params.chain.clone())
};
let finality_proof_requests = {
let config = finality_requests::Config::new(¶ms.protocol_id);
finality_requests::FinalityProofRequests::new(config, params.finality_proof_provider.clone())
};
let light_client_handler = {
let config = light_client_handler::Config::new(¶ms.protocol_id);
light_client_handler::LightClientHandler::new(
config,
params.chain,
checker,
peerset_handle.clone(),
)
};
let discovery_config = {
let mut config = DiscoveryConfig::new(local_public.clone());
config.with_user_defined(known_addresses);
config.discovery_limit(u64::from(params.network_config.out_peers) + 15);
config.add_protocol(params.protocol_id.clone());
config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht);
match params.network_config.transport {
TransportConfig::MemoryOnly => {
config.with_mdns(false);
config.allow_private_ipv4(false);
}
TransportConfig::Normal { enable_mdns, allow_private_ipv4, .. } => {
config.with_mdns(enable_mdns);
config.allow_private_ipv4(allow_private_ipv4);
}
}
config
};
let mut behaviour = {
let result = Behaviour::new(
protocol,
params.role,
user_agent,
local_public,
block_requests,
finality_proof_requests,
light_client_handler,
discovery_config,
params.network_config.request_response_protocols,
);
match result {
Ok(b) => b,
Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
return Err(Error::DuplicateRequestResponseProtocol {
protocol: proto,
})
},
}
};
for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols {
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
}
let (transport, bandwidth) = {
let (config_mem, config_wasm, flowctrl) = match params.network_config.transport {
TransportConfig::MemoryOnly => (true, None, false),
TransportConfig::Normal { wasm_external_transport, use_yamux_flow_control, .. } =>
(false, wasm_external_transport, use_yamux_flow_control)
};
transport::build_transport(local_identity, config_mem, config_wasm, flowctrl)
};
let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.peer_connection_limit(crate::MAX_CONNECTIONS_PER_PEER)
.notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
.connection_event_buffer_size(1024);
if let Some(spawner) = params.executor {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
builder = builder.executor(Box::new(SpawnImpl(spawner)));
}
(builder.build(), bandwidth)
};
let metrics = match ¶ms.metrics_registry {
Some(registry) => {
Some(metrics::register(registry, MetricSources {
bandwidth: bandwidth.clone(),
major_syncing: is_major_syncing.clone(),
connected_peers: num_connected.clone(),
})?)
}
None => None
};
for addr in ¶ms.network_config.listen_addresses {
if let Err(err) = Swarm::<B, H>::listen_on(&mut swarm, addr.clone()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
for addr in ¶ms.network_config.public_addresses {
Swarm::<B, H>::add_external_address(&mut swarm, addr.clone());
}
let external_addresses = Arc::new(Mutex::new(Vec::new()));
let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
let protocol_name_by_engine = Mutex::new({
params.network_config.notifications_protocols.iter().cloned().collect()
});
let service = Arc::new(NetworkService {
bandwidth,
external_addresses: external_addresses.clone(),
num_connected: num_connected.clone(),
is_major_syncing: is_major_syncing.clone(),
peerset: peerset_handle,
local_peer_id,
to_worker,
peers_notifications_sinks: peers_notifications_sinks.clone(),
protocol_name_by_engine,
notifications_sizes_metric:
metrics.as_ref().map(|metrics| metrics.notifications_sizes.clone()),
_marker: PhantomData,
});
Ok(NetworkWorker {
external_addresses,
num_connected,
is_major_syncing,
network_service: swarm,
service,
import_queue: params.import_queue,
from_service,
light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()),
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
peers_notifications_sinks,
metrics,
boot_node_ids,
pending_requests: HashMap::with_capacity(128),
})
}
pub fn status(&self) -> NetworkStatus<B> {
NetworkStatus {
sync_state: self.sync_state(),
best_seen_block: self.best_seen_block(),
num_sync_peers: self.num_sync_peers(),
num_connected_peers: self.num_connected_peers(),
num_active_peers: self.num_active_peers(),
total_bytes_inbound: self.total_bytes_inbound(),
total_bytes_outbound: self.total_bytes_outbound(),
}
}
pub fn total_bytes_inbound(&self) -> u64 {
self.service.bandwidth.total_inbound()
}
pub fn total_bytes_outbound(&self) -> u64 {
self.service.bandwidth.total_outbound()
}
pub fn num_connected_peers(&self) -> usize {
self.network_service.user_protocol().num_connected_peers()
}
pub fn num_active_peers(&self) -> usize {
self.network_service.user_protocol().num_active_peers()
}
pub fn sync_state(&self) -> SyncState {
self.network_service.user_protocol().sync_state()
}
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.network_service.user_protocol().best_seen_block()
}
pub fn num_sync_peers(&self) -> u32 {
self.network_service.user_protocol().num_sync_peers()
}
pub fn num_queued_blocks(&self) -> u32 {
self.network_service.user_protocol().num_queued_blocks()
}
pub fn num_downloaded_blocks(&self) -> usize {
self.network_service.user_protocol().num_downloaded_blocks()
}
pub fn num_sync_requests(&self) -> usize {
self.network_service.user_protocol().num_sync_requests()
}
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.network_service.add_known_address(peer_id, addr);
}
pub fn service(&self) -> &Arc<NetworkService<B, H>> {
&self.service
}
pub fn on_block_finalized(&mut self, hash: B::Hash, header: B::Header) {
self.network_service.user_protocol_mut().on_block_finalized(hash, &header);
}
pub fn update_chain(&mut self) {
self.network_service.user_protocol_mut().update_chain();
}
pub fn local_peer_id(&self) -> &PeerId {
Swarm::<B, H>::local_peer_id(&self.network_service)
}
pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
Swarm::<B, H>::listeners(&self.network_service)
}
pub fn network_state(&mut self) -> NetworkState {
let swarm = &mut self.network_service;
let open = swarm.user_protocol().open_peers().cloned().collect::<Vec<_>>();
let connected_peers = {
let swarm = &mut *swarm;
open.iter().filter_map(move |peer_id| {
let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id)
.into_iter().collect();
let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) {
e.clone().into()
} else {
error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \
and debug information about {:?}", peer_id);
return None
};
Some((peer_id.to_base58(), NetworkStatePeer {
endpoint,
version_string: swarm.node(peer_id)
.and_then(|i| i.client_version().map(|s| s.to_owned())),
latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()),
enabled: swarm.user_protocol().is_enabled(&peer_id),
open: swarm.user_protocol().is_open(&peer_id),
known_addresses,
}))
}).collect()
};
let not_connected_peers = {
let swarm = &mut *swarm;
swarm.known_peers().into_iter()
.filter(|p| open.iter().all(|n| n != p))
.map(move |peer_id| {
(peer_id.to_base58(), NetworkStateNotConnectedPeer {
version_string: swarm.node(&peer_id)
.and_then(|i| i.client_version().map(|s| s.to_owned())),
latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()),
known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id)
.into_iter().collect(),
})
})
.collect()
};
NetworkState {
peer_id: Swarm::<B, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, H>::external_addresses(&swarm).cloned().collect(),
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
}
}
pub fn peers_debug_info(&mut self) -> Vec<(PeerId, PeerInfo<B>)> {
self.network_service.user_protocol_mut()
.peers_info()
.map(|(id, info)| (id.clone(), info.clone()))
.collect()
}
pub fn remove_reserved_peer(&self, peer: PeerId) {
self.service.remove_reserved_peer(peer);
}
pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.service.add_reserved_peer(peer)
}
}
impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
pub fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
self.peerset.set_reserved_peers(peers)
}
pub fn set_authorized_only(&self, reserved_only: bool) {
self.peerset.set_reserved_only(reserved_only)
}
pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) {
sink.clone()
} else {
return;
}
};
let message_len = message.len();
let protocol_name = self.protocol_name_by_engine.lock().get(&engine_id).cloned();
if let Some(protocol_name) = protocol_name {
sink.send_sync_notification(protocol_name, message);
} else {
return;
}
if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() {
notifications_sizes_metric
.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)])
.observe(message_len as f64);
}
}
pub fn notification_sender(
&self,
target: PeerId,
engine_id: ConsensusEngineId,
) -> Result<NotificationSender, NotificationSenderError> {
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) {
sink.clone()
} else {
return Err(NotificationSenderError::Closed);
}
};
let protocol_name = match self.protocol_name_by_engine.lock().get(&engine_id).cloned() {
Some(p) => p,
None => return Err(NotificationSenderError::BadProtocol),
};
Ok(NotificationSender {
sink,
protocol_name,
notification_size_metric: self.notifications_sizes_metric.as_ref().map(|histogram| {
histogram.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)])
}),
})
}
pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> {
let (tx, rx) = out_events::channel(name);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
pub async fn request(
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>
) -> Result<Vec<u8>, RequestFailure> {
let (tx, rx) = oneshot::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx
});
match rx.await {
Ok(v) => v,
Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
}
}
pub fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, str>>,
) {
let protocol_name = protocol_name.into();
self.protocol_name_by_engine.lock().insert(engine_id, protocol_name.clone());
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol {
engine_id,
protocol_name,
});
}
pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions);
}
pub fn propagate_transaction(&self, hash: H) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash));
}
pub fn announce_block(&self, hash: B::Hash, data: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data));
}
pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
self.peerset.report_peer(who, cost_benefit);
}
pub fn disconnect_peer(&self, who: PeerId) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who));
}
pub fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::RequestJustification(*hash, number));
}
pub fn is_major_syncing(&self) -> bool {
self.is_major_syncing.load(Ordering::Relaxed)
}
pub fn get_value(&self, key: &record::Key) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
}
pub fn put_value(&self, key: record::Key, value: Vec<u8>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
}
pub fn accept_unreserved_peers(&self) {
self.peerset.set_reserved_only(false);
}
pub fn deny_unreserved_peers(&self) {
self.peerset.set_reserved_only(true);
}
pub fn remove_reserved_peer(&self, peer: PeerId) {
self.peerset.remove_reserved_peer(peer);
}
pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?;
if peer_id == self.local_peer_id {
return Err("Local peer ID cannot be added as a reserved peer.".to_string())
}
self.peerset.add_reserved_peer(peer_id.clone());
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
Ok(())
}
pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number));
}
pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
let peers = peers.into_iter()
.map(|mut addr| {
let peer = match addr.pop() {
Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key)
.map_err(|_| "Invalid PeerId format".to_string())?,
_ => return Err("Missing PeerId from address".to_string()),
};
if peer == self.local_peer_id {
Err("Local peer ID in priority group.".to_string())
} else {
Ok((peer, addr))
}
})
.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()?;
let peer_ids = peers.iter().map(|(peer_id, _addr)| peer_id.clone()).collect();
self.peerset.set_priority_group(group_id, peer_ids);
for (peer_id, addr) in peers.into_iter() {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
}
Ok(())
}
pub fn num_connected(&self) -> usize {
self.num_connected.load(Ordering::Relaxed)
}
pub fn update_chain(&self) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::UpdateChain);
}
pub fn own_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::OwnBlockImported(hash, number));
}
}
impl<B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle
for NetworkService<B, H>
{
fn is_major_syncing(&mut self) -> bool {
NetworkService::is_major_syncing(self)
}
fn is_offline(&mut self) -> bool {
self.num_connected.load(Ordering::Relaxed) == 0
}
}
impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle
for &'a NetworkService<B, H>
{
fn is_major_syncing(&mut self) -> bool {
NetworkService::is_major_syncing(self)
}
fn is_offline(&mut self) -> bool {
self.num_connected.load(Ordering::Relaxed) == 0
}
}
impl<B, H> NetworkStateInfo for NetworkService<B, H>
where
B: sp_runtime::traits::Block,
H: ExHashT,
{
fn external_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses.lock().clone()
}
fn local_peer_id(&self) -> PeerId {
self.local_peer_id.clone()
}
}
#[must_use]
pub struct NotificationSender {
sink: NotificationsSink,
protocol_name: Cow<'static, str>,
notification_size_metric: Option<Histogram>,
}
impl NotificationSender {
pub async fn ready<'a>(&'a self) -> Result<NotificationSenderReady<'a>, NotificationSenderError> {
Ok(NotificationSenderReady {
ready: match self.sink.reserve_notification(self.protocol_name.clone()).await {
Ok(r) => r,
Err(()) => return Err(NotificationSenderError::Closed),
},
notification_size_metric: self.notification_size_metric.clone(),
})
}
}
#[must_use]
pub struct NotificationSenderReady<'a> {
ready: Ready<'a>,
notification_size_metric: Option<Histogram>,
}
impl<'a> NotificationSenderReady<'a> {
pub fn send(self, notification: impl Into<Vec<u8>>) -> Result<(), NotificationSenderError> {
let notification = notification.into();
if let Some(notification_size_metric) = &self.notification_size_metric {
notification_size_metric.observe(notification.len() as f64);
}
self.ready
.send(notification)
.map_err(|()| NotificationSenderError::Closed)
}
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum NotificationSenderError {
Closed,
BadProtocol,
}
enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PropagateTransaction(H),
PropagateTransactions,
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>),
GetValue(record::Key),
PutValue(record::Key, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(out_events::Sender),
Request {
target: PeerId,
protocol: Cow<'static, str>,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
RegisterNotifProtocol {
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, str>,
},
DisconnectPeer(PeerId),
UpdateChain,
OwnBlockImported(B::Hash, NumberFor<B>),
}
#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
external_addresses: Arc<Mutex<Vec<Multiaddr>>>,
num_connected: Arc<AtomicUsize>,
is_major_syncing: Arc<AtomicBool>,
service: Arc<NetworkService<B, H>>,
network_service: Swarm<B, H>,
import_queue: Box<dyn ImportQueue<B>>,
from_service: TracingUnboundedReceiver<ServiceToWorkerMsg<B, H>>,
light_client_rqs: Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>,
event_streams: out_events::OutChannels,
metrics: Option<Metrics>,
boot_node_ids: Arc<HashSet<PeerId>>,
pending_requests: HashMap<
behaviour::RequestId,
(oneshot::Sender<Result<Vec<u8>, RequestFailure>>, Instant, String)
>,
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
}
impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
let this = &mut *self;
this.import_queue.poll_actions(cx, &mut NetworkLink {
protocol: &mut this.network_service,
});
if let Some(light_client_rqs) = this.light_client_rqs.as_mut() {
while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) {
if this.network_service.light_client_request(rq).is_err() {
log::warn!("Couldn't start light client request: too many pending requests");
}
if let Some(metrics) = this.metrics.as_ref() {
metrics.issued_light_requests.inc();
}
}
}
let mut num_iterations = 0;
loop {
num_iterations += 1;
if num_iterations >= 100 {
cx.waker().wake_by_ref();
break;
}
let msg = match this.from_service.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break,
};
match msg {
ServiceToWorkerMsg::AnnounceBlock(hash, data) =>
this.network_service.user_protocol_mut().announce_block(hash, data),
ServiceToWorkerMsg::RequestJustification(hash, number) =>
this.network_service.user_protocol_mut().request_justification(&hash, number),
ServiceToWorkerMsg::PropagateTransaction(hash) =>
this.network_service.user_protocol_mut().propagate_transaction(&hash),
ServiceToWorkerMsg::PropagateTransactions =>
this.network_service.user_protocol_mut().propagate_transactions(),
ServiceToWorkerMsg::GetValue(key) =>
this.network_service.get_value(&key),
ServiceToWorkerMsg::PutValue(key, value) =>
this.network_service.put_value(key, value),
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
this.network_service.add_known_address(peer_id, addr),
ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) =>
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
match this.network_service.send_request(&target, &protocol, request) {
Ok(request_id) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
this.pending_requests.insert(
request_id,
(pending_response, Instant::now(), protocol.to_string())
);
},
Err(behaviour::SendRequestError::NotConnected) => {
let err = RequestFailure::Network(OutboundFailure::ConnectionClosed);
let _ = pending_response.send(Err(err));
},
Err(behaviour::SendRequestError::UnknownProtocol) => {
let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols);
let _ = pending_response.send(Err(err));
},
}
},
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
this.network_service
.register_notifications_protocol(engine_id, protocol_name);
},
ServiceToWorkerMsg::DisconnectPeer(who) =>
this.network_service.user_protocol_mut().disconnect_peer(&who),
ServiceToWorkerMsg::UpdateChain =>
this.network_service.user_protocol_mut().update_chain(),
ServiceToWorkerMsg::OwnBlockImported(hash, number) =>
this.network_service.user_protocol_mut().own_block_imported(hash, number),
}
}
let mut num_iterations = 0;
loop {
num_iterations += 1;
if num_iterations >= 1000 {
cx.waker().wake_by_ref();
break;
}
let next_event = this.network_service.next_event();
futures::pin_mut!(next_event);
let poll_value = next_event.poll_unpin(cx);
match poll_value {
Poll::Pending => break,
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.import_queue_blocks_submitted.inc();
}
this.import_queue.import_blocks(origin, blocks);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.import_queue_justifications_submitted.inc();
}
this.import_queue.import_justification(origin, hash, nb, justification);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.import_queue_finality_proofs_submitted.inc();
}
this.import_queue.import_finality_proof(origin, hash, nb, proof);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
match result {
Ok(serve_time) => {
metrics.requests_in_success_total
.with_label_values(&[&protocol])
.observe(serve_time.as_secs_f64());
}
Err(err) => {
let reason = match err {
ResponseFailure::Busy => "busy",
ResponseFailure::Network(InboundFailure::Timeout) => "timeout",
ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
"unsupported",
ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
"connection-closed",
};
metrics.requests_in_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => {
if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) {
if let Some(metrics) = this.metrics.as_ref() {
match &result {
Ok(_) => {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(started.elapsed().as_secs_f64());
}
Err(err) => {
let reason = match err {
RequestFailure::Refused => "refused",
RequestFailure::Network(OutboundFailure::DialFailure) =>
"dial-failure",
RequestFailure::Network(OutboundFailure::Timeout) =>
"timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed",
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics.requests_out_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
}
let _ = send_back.send(result);
} else {
error!("Request not in pending_requests");
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(request_duration.as_secs_f64());
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.kademlia_random_queries_total
.with_label_values(&[&protocol.as_ref()])
.inc();
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { remote, engine_id, notifications_sink, role })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.notifications_streams_opened_total
.with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id)]).inc();
}
{
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
peers_notifications_sinks.insert((remote.clone(), engine_id), notifications_sink);
}
this.event_streams.send(Event::NotificationStreamOpened {
remote,
engine_id,
role,
});
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { remote, engine_id, notifications_sink })) => {
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
if let Some(s) = peers_notifications_sinks.get_mut(&(remote, engine_id)) {
*s = notifications_sink;
} else {
log::error!(
target: "sub-libp2p",
"NotificationStreamReplaced for non-existing substream"
);
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, engine_id })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.notifications_streams_closed_total
.with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id[..])]).inc();
}
this.event_streams.send(Event::NotificationStreamClosed {
remote: remote.clone(),
engine_id,
});
{
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
peers_notifications_sinks.remove(&(remote.clone(), engine_id));
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages })) => {
if let Some(metrics) = this.metrics.as_ref() {
for (engine_id, message) in &messages {
metrics.notifications_sizes
.with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)])
.observe(message.len() as f64);
}
}
this.event_streams.send(Event::NotificationsReceived {
remote,
messages,
});
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => {
if let Some(metrics) = this.metrics.as_ref() {
let query_type = match event {
DhtEvent::ValueFound(_) => "value-found",
DhtEvent::ValueNotFound(_) => "value-not-found",
DhtEvent::ValuePut(_) => "value-put",
DhtEvent::ValuePutFailed(_) => "value-put-failed",
};
metrics.kademlia_query_duration.with_label_values(&[query_type])
.observe(duration.as_secs_f64());
}
this.event_streams.send(Event::Dht(event));
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
if let Some(metrics) = this.metrics.as_ref() {
let direction = match endpoint {
ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in",
};
metrics.connections_opened_total.with_label_values(&[direction]).inc();
if num_established.get() == 1 {
metrics.distinct_peers_connections_opened_total.inc();
}
}
},
Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, num_established }) => {
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause);
if let Some(metrics) = this.metrics.as_ref() {
let direction = match endpoint {
ConnectedPoint::Dialer { .. } => "out",
ConnectedPoint::Listener { .. } => "in",
};
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
};
metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
if num_established == 0 {
metrics.distinct_peers_connections_closed_total.inc();
}
}
},
Poll::Ready(SwarmEvent::NewListenAddr(addr)) => {
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr);
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_local_addresses.inc();
}
},
Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) => {
info!(target: "sub-libp2p", "📪 No longer listening on {}", addr);
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_local_addresses.dec();
}
},
Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address, error, .. }) => {
trace!(
target: "sub-libp2p", "Libp2p => Failed to reach {:?} through {:?}: {}",
peer_id,
address,
error,
);
if this.boot_node_ids.contains(&peer_id) {
if let PendingConnectionError::InvalidPeerId = error {
error!(
"💔 The bootnode you want to connect to at `{}` provided a different peer ID than the one you expect: `{}`.",
address,
peer_id,
);
}
}
if let Some(metrics) = this.metrics.as_ref() {
match error {
PendingConnectionError::ConnectionLimit(_) =>
metrics.pending_connections_errors_total.with_label_values(&["limit-reached"]).inc(),
PendingConnectionError::InvalidPeerId =>
metrics.pending_connections_errors_total.with_label_values(&["invalid-peer-id"]).inc(),
PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) =>
metrics.pending_connections_errors_total.with_label_values(&["transport-error"]).inc(),
}
}
}
Poll::Ready(SwarmEvent::Dialing(peer_id)) =>
trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id),
Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))",
local_addr, send_back_addr);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_total.inc();
}
},
Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}",
local_addr, send_back_addr, error);
if let Some(metrics) = this.metrics.as_ref() {
let reason = match error {
PendingConnectionError::ConnectionLimit(_) => "limit-reached",
PendingConnectionError::InvalidPeerId => "invalid-peer-id",
PendingConnectionError::Transport(_) |
PendingConnectionError::IO(_) => "transport-error",
};
metrics.incoming_connections_errors_total.with_label_values(&[reason]).inc();
}
},
Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => {
trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.",
peer_id, endpoint);
if let Some(metrics) = this.metrics.as_ref() {
metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc();
}
},
Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) =>
trace!(target: "sub-libp2p", "Libp2p => UnknownPeerUnreachableAddr({}): {}",
address, error),
Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses }) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_local_addresses.sub(addresses.len() as u64);
}
let addrs = addresses.into_iter().map(|a| a.to_string())
.collect::<Vec<_>>().join(", ");
match reason {
Ok(()) => error!(
target: "sub-libp2p",
"📪 Libp2p listener ({}) closed gracefully",
addrs
),
Err(e) => error!(
target: "sub-libp2p",
"📪 Libp2p listener ({}) closed: {}",
addrs, e
),
}
},
Poll::Ready(SwarmEvent::ListenerError { error }) => {
trace!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error);
if let Some(metrics) = this.metrics.as_ref() {
metrics.listeners_errors_total.inc();
}
},
};
}
let num_connected_peers = this.network_service.user_protocol_mut().num_connected_peers();
this.num_connected.store(num_connected_peers, Ordering::Relaxed);
{
let external_addresses = Swarm::<B, H>::external_addresses(&this.network_service).cloned().collect();
*this.external_addresses.lock() = external_addresses;
}
let is_major_syncing = match this.network_service.user_protocol_mut().sync_state() {
SyncState::Idle => false,
SyncState::Downloading => true,
};
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
if let Some(metrics) = this.metrics.as_ref() {
for (proto, buckets) in this.network_service.num_entries_per_kbucket() {
for (lower_ilog2_bucket_bound, num_entries) in buckets {
metrics.kbuckets_num_nodes
.with_label_values(&[&proto.as_ref(), &lower_ilog2_bucket_bound.to_string()])
.set(num_entries as u64);
}
}
for (proto, num_entries) in this.network_service.num_kademlia_records() {
metrics.kademlia_records_count.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
for (proto, num_entries) in this.network_service.kademlia_records_total_size() {
metrics.kademlia_records_sizes_total.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64);
}
Poll::Pending
}
}
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
}
pub(crate) fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
if let Ok(s) = std::str::from_utf8(&id[..]) {
Cow::Borrowed(s)
} else {
Cow::Owned(format!("{:?}", id))
}
}
type Swarm<B, H> = libp2p::swarm::Swarm<Behaviour<B, H>>;
struct NetworkLink<'a, B: BlockT, H: ExHashT> {
protocol: &'a mut Swarm<B, H>,
}
impl<'a, B: BlockT, H: ExHashT> Link<B> for NetworkLink<'a, B, H> {
fn blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
self.protocol.user_protocol_mut().on_blocks_processed(imported, count, results)
}
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success);
if !success {
info!("💔 Invalid justification provided by {} for #{}", who, hash);
self.protocol.user_protocol_mut().disconnect_peer(&who);
self.protocol.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid justification"));
}
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_justification(hash, number)
}
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.user_protocol_mut().request_finality_proof(hash, number)
}
fn finality_proof_imported(
&mut self,
who: PeerId,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
let success = finalization_result.is_ok();
self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result);
if !success {
info!("💔 Invalid finality proof provided by {} for #{}", who, request_block.0);
self.protocol.user_protocol_mut().disconnect_peer(&who);
self.protocol.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid finality proof"));
}
}
}
fn ensure_addresses_consistent_with_transport<'a>(
addresses: impl Iterator<Item = &'a Multiaddr>,
transport: &TransportConfig,
) -> Result<(), Error> {
if matches!(transport, TransportConfig::MemoryOnly) {
let addresses: Vec<_> = addresses
.filter(|x| x.iter()
.any(|y| !matches!(y, libp2p::core::multiaddr::Protocol::Memory(_)))
)
.cloned()
.collect();
if !addresses.is_empty() {
return Err(Error::AddressesForAnotherTransport {
transport: transport.clone(),
addresses,
});
}
} else {
let addresses: Vec<_> = addresses
.filter(|x| x.iter()
.any(|y| matches!(y, libp2p::core::multiaddr::Protocol::Memory(_)))
)
.cloned()
.collect();
if !addresses.is_empty() {
return Err(Error::AddressesForAnotherTransport {
transport: transport.clone(),
addresses,
});
}
}
Ok(())
}