mod behaviour;
mod registry;
#[cfg(test)]
mod test;
mod upgrade;
pub mod protocols_handler;
pub mod toggle;
pub use behaviour::{
NetworkBehaviour,
NetworkBehaviourAction,
NetworkBehaviourEventProcess,
PollParameters,
NotifyHandler,
DialPeerCondition
};
pub use protocols_handler::{
IntoProtocolsHandler,
IntoProtocolsHandlerSelect,
KeepAlive,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerSelect,
ProtocolsHandlerUpgrErr,
OneShotHandler,
OneShotHandlerConfig,
SubstreamProtocol
};
use protocols_handler::{
NodeHandlerWrapperBuilder,
NodeHandlerWrapperError,
};
use futures::{
prelude::*,
executor::ThreadPoolBuilder,
stream::FusedStream,
};
use libp2p_core::{
Executor,
Transport,
Multiaddr,
Negotiated,
PeerId,
connection::{
ConnectionError,
ConnectionId,
ConnectionInfo,
ConnectionLimit,
ConnectedPoint,
EstablishedConnection,
IntoConnectionHandler,
ListenerId,
PendingConnectionError,
Substream
},
transport::{TransportError, boxed::Boxed as BoxTransport},
muxing::{StreamMuxer, StreamMuxerBox},
network::{
Network,
NetworkInfo,
NetworkEvent,
NetworkConfig,
peer::ConnectedPeer,
},
upgrade::ProtocolName,
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, hash::Hash, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;
pub type Swarm<TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
TConnInfo,
>;
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
#[derive(Debug)]
pub enum SwarmEvent<TBvEv, THandleErr> {
Behaviour(TBvEv),
ConnectionEstablished {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: NonZeroU32,
},
ConnectionClosed {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: u32,
cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
},
IncomingConnection {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
IncomingConnectionError {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
error: PendingConnectionError<io::Error>,
},
BannedPeer {
peer_id: PeerId,
endpoint: ConnectedPoint,
},
UnreachableAddr {
peer_id: PeerId,
address: Multiaddr,
error: PendingConnectionError<io::Error>,
attempts_remaining: u32,
},
UnknownPeerUnreachableAddr {
address: Multiaddr,
error: PendingConnectionError<io::Error>,
},
NewListenAddr(Multiaddr),
ExpiredListenAddr(Multiaddr),
ListenerClosed {
addresses: Vec<Multiaddr>,
reason: Result<(), io::Error>,
},
ListenerError {
error: io::Error,
},
Dialing(PeerId),
}
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo = PeerId>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
network: Network<
BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
TConnInfo,
PeerId,
>,
behaviour: TBehaviour,
supported_protocols: SmallVec<[Vec<u8>; 16]>,
listened_addrs: SmallVec<[Multiaddr; 8]>,
external_addrs: Addresses,
banned_peers: HashSet<PeerId>,
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
type Target = TBehaviour;
fn deref(&self) -> &Self::Target {
&self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
{
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
{
SwarmBuilder::new(transport, behaviour, local_peer_id)
.build()
}
pub fn network_info(me: &Self) -> NetworkInfo {
me.network.info()
}
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
me.network.listen_on(addr)
}
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
me.network.remove_listener(id)
}
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler();
me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ())
}
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
if me.banned_peers.contains(peer_id) {
me.behaviour.inject_dial_failure(peer_id);
return Err(DialError::Banned)
}
let self_listening = &me.listened_addrs;
let mut addrs = me.behaviour.addresses_of_peer(peer_id)
.into_iter()
.filter(|a| !self_listening.contains(a));
let result =
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
me.network.peer(peer_id.clone())
.dial(first, addrs, handler)
.map(|_| ())
.map_err(DialError::ConnectionLimit)
} else {
Err(DialError::NoAddresses)
};
if let Err(error) = &result {
log::debug!(
"New dialing attempt to peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
}
result
}
pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.network.listen_addrs()
}
pub fn external_addresses(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.external_addrs.iter()
}
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.network.local_peer_id()
}
pub fn add_external_address(me: &mut Self, addr: Multiaddr) {
me.external_addrs.add(addr)
}
pub fn connection_info(me: &mut Self, peer_id: &PeerId) -> Option<TConnInfo> {
if let Some(mut n) = me.network.peer(peer_id.clone()).into_connected() {
Some(n.some_connection().info().clone())
} else {
None
}
}
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
if me.banned_peers.insert(peer_id.clone()) {
if let Some(peer) = me.network.peer(peer_id).into_connected() {
peer.disconnect();
}
}
}
pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.remove(&peer_id);
}
pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
}
pub async fn next(&mut self) -> TBehaviour::OutEvent {
future::poll_fn(move |cx| {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(event);
}
}
}).await
}
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
{
let this = &mut *self;
loop {
let mut network_not_ready = false;
match this.network.poll(cx) {
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
let peer = connection.peer_id().clone();
let connection = connection.id();
this.behaviour.inject_event(peer, connection, event);
},
Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
let peer = connection.peer_id();
let connection = connection.id();
this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
},
Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
let peer_id = connection.peer_id().clone();
let endpoint = connection.endpoint().clone();
if this.banned_peers.contains(&peer_id) {
this.network.peer(peer_id.clone())
.into_connected()
.expect("the Network just notified us that we were connected; QED")
.disconnect();
return Poll::Ready(SwarmEvent::BannedPeer {
peer_id,
endpoint,
});
} else {
log::debug!("Connection established: {:?}; Total (peer): {}.",
connection.connected(), num_established);
let endpoint = connection.endpoint().clone();
this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
if num_established.get() == 1 {
this.behaviour.inject_connected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id, num_established, endpoint
});
}
},
Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
if let Some(error) = error.as_ref() {
log::debug!("Connection {:?} closed: {:?}", connected, error);
} else {
log::debug!("Connection {:?} closed (active close).", connected);
}
let info = connected.info;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint);
if num_established == 0 {
this.behaviour.inject_disconnected(info.peer_id());
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id: info.peer_id().clone(),
endpoint,
cause: error,
num_established,
});
},
Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
let handler = this.behaviour.new_handler();
let local_addr = connection.local_addr.clone();
let send_back_addr = connection.send_back_addr.clone();
if let Err(e) = this.network.accept(connection, handler.into_node_handler_builder()) {
log::warn!("Incoming connection rejected: {:?}", e);
}
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
},
Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr);
}
this.behaviour.inject_listener_closed(listener_id, match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
});
return Poll::Ready(SwarmEvent::ListenerClosed {
addresses,
reason,
});
}
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
this.behaviour.inject_listener_error(listener_id, &error);
return Poll::Ready(SwarmEvent::ListenerError {
error,
});
},
Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
log::debug!("Incoming connection failed: {:?}", error);
return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
},
Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
log::debug!(
"Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
peer_id, multiaddr, error, attempts_remaining);
this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if attempts_remaining == 0 {
this.behaviour.inject_dial_failure(&peer_id);
}
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id,
address: multiaddr,
error,
attempts_remaining,
});
},
Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
multiaddr, error);
this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
address: multiaddr,
error,
});
},
}
if let Some((peer_id, handler, event)) = this.pending_event.take() {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
PendingNotifyHandler::One(conn_id) =>
if let Some(mut conn) = peer.connection(conn_id) {
if let Some(event) = notify_one(&mut conn, event, cx) {
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
},
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
PendingNotifyHandler::All(ids) => {
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
}
debug_assert!(this.pending_event.is_none());
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &mut this.network.local_peer_id(),
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if network_not_ready => return Poll::Pending,
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
let condition_matched = match condition {
DialPeerCondition::Disconnected
if this.network.is_disconnected(&peer_id) => true,
DialPeerCondition::NotDialing
if !this.network.is_dialing(&peer_id) => true,
_ => false
};
if condition_matched {
if ExpandedSwarm::dial(this, &peer_id).is_ok() {
return Poll::Ready(SwarmEvent::Dialing(peer_id))
}
} else {
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
peer_id, condition);
let self_listening = &this.listened_addrs;
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
let mut attempt = peer.some_attempt();
for a in addrs {
if !self_listening.contains(&a) {
attempt.add_address(a);
}
}
}
}
}
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
NotifyHandler::One(connection) => {
if let Some(mut conn) = peer.connection(connection) {
if let Some(event) = notify_one(&mut conn, event, cx) {
let handler = PendingNotifyHandler::One(connection);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
NotifyHandler::Any => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
NotifyHandler::All => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| *a != addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr);
}
},
}
}
}
}
enum PendingNotifyHandler {
One(ConnectionId),
Any(SmallVec<[ConnectionId; 10]>),
All(SmallVec<[ConnectionId; 10]>),
}
fn notify_one<'a, TInEvent, TConnInfo>(
conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<TInEvent>
where
TConnInfo: ConnectionInfo
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
Poll::Ready(Err(())) => None,
Poll::Ready(Ok(())) => {
let _ = conn.notify_handler(event);
None
}
}
}
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
{
let mut pending = SmallVec::new();
let mut event = Some(event);
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Err(())) => {}
Poll::Ready(Ok(())) => {
let e = event.take().expect("by (1),(2)");
if let Err(e) = conn.notify_handler(e) {
event = Some(e)
} else {
break
}
}
}
}
}
event.and_then(|e|
if !pending.is_empty() {
Some((e, pending))
} else {
None
})
}
fn notify_all<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TInEvent: Clone,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
{
if ids.len() == 1 {
if let Some(mut conn) = peer.connection(ids[0]) {
return notify_one(&mut conn, event, cx).map(|e| (e, ids))
}
}
let mut pending = SmallVec::new();
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Ok(())) => {
let _ = conn.notify_handler(event.clone());
},
Poll::Ready(Err(())) => {}
}
}
}
if !pending.is_empty() {
return Some((event, pending))
}
None
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
type Item = TBehaviour::OutEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(Some(event));
}
}
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
fn is_terminated(&self) -> bool {
false
}
}
pub struct SwarmPollParameters<'a> {
local_peer_id: &'a PeerId,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
external_addrs: &'a Addresses,
}
impl<'a> PollParameters for SwarmPollParameters<'a> {
type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
type ExternalAddressesIter = AddressIntoIter;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
self.supported_protocols.to_vec().into_iter()
}
fn listened_addresses(&self) -> Self::ListenedAddressesIter {
self.listened_addrs.to_vec().into_iter()
}
fn external_addresses(&self) -> Self::ExternalAddressesIter {
self.external_addrs.clone().into_iter()
}
fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
}
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
local_peer_id: PeerId,
transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
behaviour: TBehaviour,
network_config: NetworkConfig,
}
impl<TBehaviour, TConnInfo> SwarmBuilder<TBehaviour, TConnInfo>
where TBehaviour: NetworkBehaviour,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
pub fn new<TTrans, TMuxer>(transport: TTrans, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTrans::Error: Send + Sync + 'static,
TTrans::Listener: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TTrans::Dial: Send + 'static,
{
let transport = transport
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
SwarmBuilder {
local_peer_id,
transport,
behaviour,
network_config: Default::default(),
}
}
pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.network_config.set_executor(e);
self
}
pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.network_config.set_notify_handler_buffer_size(n);
self
}
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
self.network_config.set_connection_event_buffer_size(n);
self
}
pub fn incoming_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_incoming_limit(n);
self
}
pub fn outgoing_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_outgoing_limit(n);
self
}
pub fn peer_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_established_per_peer_limit(n);
self
}
pub fn build(mut self) -> Swarm<TBehaviour, TConnInfo> {
let supported_protocols = self.behaviour
.new_handler()
.inbound_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let mut network_cfg = self.network_config;
if network_cfg.executor().is_none() {
match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
{
Ok(tp) => {
network_cfg.set_executor(Box::new(move |f| tp.spawn_ok(f)));
},
Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err)
}
}
let network = Network::new(self.transport, self.local_peer_id, network_cfg);
ExpandedSwarm {
network,
behaviour: self.behaviour,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
pending_event: None
}
}
}
#[derive(Debug)]
pub enum DialError {
Banned,
ConnectionLimit(ConnectionLimit),
NoAddresses
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::Banned => write!(f, "Dial error: peer is banned.")
}
}
}
impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::NoAddresses => None,
DialError::Banned => None
}
}
}
#[derive(Clone, Default)]
pub struct DummyBehaviour {
}
impl NetworkBehaviour for DummyBehaviour {
type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
protocols_handler::DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_event(&mut self, _: PeerId, _: ConnectionId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::protocols_handler::DummyProtocolsHandler;
use crate::test::{MockBehaviour, CallTraceBehaviour};
use futures::{future, executor};
use libp2p_core::{
identity,
upgrade,
multiaddr,
transport::{self, dummy::*}
};
use libp2p_mplex::Multiplex;
use libp2p_noise as noise;
use super::*;
fn get_random_id() -> identity::PublicKey {
identity::Keypair::generate_ed25519().public()
}
fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
where
T: ProtocolsHandler + Clone,
T::OutEvent: Clone,
O: Send + 'static
{
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
let transport = transport::MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
}
#[test]
fn test_build_swarm() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let behaviour = DummyBehaviour {};
let swarm = SwarmBuilder::new(transport, behaviour, id.into())
.incoming_connection_limit(4)
.build();
assert_eq!(swarm.network.incoming_limit(), Some(4));
}
#[test]
fn test_build_swarm_with_max_listeners_none() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into()).build();
assert!(swarm.network.incoming_limit().is_none())
}
#[test]
fn test_connect_disconnect_ban() {
let mut handler_proto = DummyProtocolsHandler::default();
handler_proto.keep_alive = KeepAlive::Yes;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
enum State {
Connecting,
Disconnecting,
}
let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
let mut banned = false;
let mut unbanned = false;
let num_connections = 10;
for _ in 0 .. num_connections {
Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_established.len() > 0 {
assert_eq!(s.behaviour.inject_connected.len(), 1);
} else {
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
assert!(s.behaviour.inject_connection_closed.len() == 0);
assert!(s.behaviour.inject_disconnected.len() == 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_established.len() == num_connections
}) {
if banned {
return Poll::Ready(())
}
Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
banned = true;
state = State::Disconnecting;
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_closed.len() < num_connections {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
} else {
assert_eq!(s.behaviour.inject_disconnected.len(), 1);
}
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_closed.len() == num_connections
}) {
if unbanned {
return Poll::Ready(())
}
Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
unbanned = true;
for _ in 0 .. num_connections {
Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
}
state = State::Connecting;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending
}
}
}))
}
}