use crate::{
Multiaddr,
Transport,
StreamMuxer,
connection::{
Connected,
ConnectedPoint,
ConnectionHandler,
ConnectionInfo,
Connection,
ConnectionId,
ConnectionLimit,
EstablishedConnection,
EstablishedConnectionIter,
IntoConnectionHandler,
PendingConnection,
Substream,
pool::Pool,
},
};
use fnv::FnvHashMap;
use smallvec::SmallVec;
use std::{
collections::hash_map,
error,
fmt,
hash::Hash,
};
use super::{Network, DialingOpts};
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>
{
Connected(ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
Dialing(DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
Local,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId>,
TPeerId: fmt::Debug + Eq + Hash,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
Peer::Connected(p) => {
f.debug_struct("Connected")
.field("peer", &p)
.finish()
}
Peer::Dialing(p) => {
f.debug_struct("Dialing")
.field("peer", &p)
.finish()
}
Peer::Disconnected(p) => {
f.debug_struct("Disconnected")
.field("peer", &p)
.finish()
}
Peer::Local => {
f.debug_struct("Local")
.finish()
}
}
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
{
pub(super) fn new(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
) -> Self {
if peer_id == network.local_peer_id {
return Peer::Local;
}
if network.pool.is_connected(&peer_id) {
return Self::connected(network, peer_id)
}
if network.dialing.get_mut(&peer_id).is_some() {
return Self::dialing(network, peer_id);
}
Self::disconnected(network, peer_id)
}
fn disconnected(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
) -> Self {
Peer::Disconnected(DisconnectedPeer { network, peer_id })
}
fn connected(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
) -> Self {
Peer::Connected(ConnectedPeer { network, peer_id })
}
fn dialing(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
) -> Self {
Peer::Dialing(DialingPeer { network, peer_id })
}
}
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
pub fn is_connected(&self) -> bool {
match self {
Peer::Connected(..) => true,
Peer::Dialing(peer) => peer.is_connected(),
Peer::Disconnected(..) => false,
Peer::Local => false
}
}
pub fn is_dialing(&self) -> bool {
match self {
Peer::Dialing(_) => true,
Peer::Connected(peer) => peer.is_dialing(),
Peer::Disconnected(..) => false,
Peer::Local => false
}
}
pub fn is_disconnected(&self) -> bool {
match self {
Peer::Disconnected(..) => true,
_ => false
}
}
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
ConnectionLimit
>
where
I: IntoIterator<Item = Multiaddr>,
{
let (peer_id, network) = match self {
Peer::Connected(p) => (p.peer_id, p.network),
Peer::Dialing(p) => (p.peer_id, p.network),
Peer::Disconnected(p) => (p.peer_id, p.network),
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
};
let id = network.dial_peer(DialingOpts {
peer: peer_id.clone(),
handler,
address,
remaining: remaining.into_iter().collect(),
})?;
Ok((id, DialingPeer { network, peer_id }))
}
pub fn into_connected(self) -> Option<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Connected(peer) => Some(peer),
Peer::Dialing(peer) => peer.into_connected(),
Peer::Disconnected(..) => None,
Peer::Local => None,
}
}
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Dialing(peer) => Some(peer),
Peer::Connected(peer) => peer.into_dialing(),
Peer::Disconnected(..) => None,
Peer::Local => None
}
}
pub fn into_disconnected(self) -> Option<
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Disconnected(peer) => Some(peer),
_ => None,
}
}
}
pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
{
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
Peer::Connected(self)
}
pub fn connection<'b>(&'b mut self, id: ConnectionId)
-> Option<EstablishedConnection<'b, TInEvent, TConnInfo>>
{
self.network.pool.get_established(id)
}
pub fn num_connections(&self) -> usize {
self.network.pool.num_peer_established(&self.peer_id)
}
pub fn is_dialing(&self) -> bool {
self.network.dialing.contains_key(&self.peer_id)
}
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
if self.network.dialing.contains_key(&self.peer_id) {
Some(DialingPeer { network: self.network, peer_id: self.peer_id })
} else {
None
}
}
pub fn connections<'b>(&'b mut self) ->
EstablishedConnectionIter<'b,
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error,
TConnInfo,
TPeerId>
{
self.network.pool.iter_peer_established(&self.peer_id)
}
pub fn some_connection<'b>(&'b mut self)
-> EstablishedConnection<'b, TInEvent, TConnInfo>
{
self.connections()
.into_first()
.expect("By `Peer::new` and the definition of `ConnectedPeer`.")
}
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ConnectedPeer")
.field("peer_id", &self.peer_id)
.field("established", &self.network.pool.iter_peer_established_info(&self.peer_id))
.field("attempts", &self.network.dialing.get(&self.peer_id))
.finish()
}
}
pub struct DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
{
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
Peer::Dialing(self)
}
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}
pub fn is_connected(&self) -> bool {
self.network.pool.is_connected(&self.peer_id)
}
pub fn into_connected(self)
-> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>>
{
if self.is_connected() {
Some(ConnectedPeer { peer_id: self.peer_id, network: self.network })
} else {
None
}
}
pub fn attempt<'b>(&'b mut self, id: ConnectionId)
-> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>>
{
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id.clone()) {
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
if let Some(inner) = self.network.pool.get_outgoing(id) {
return Some(DialingAttempt { pos, inner, attempts })
}
}
}
None
}
pub fn num_attempts(&self) -> usize {
self.network.pool.num_peer_outgoing(&self.peer_id)
}
pub fn attempts<'b>(&'b mut self)
-> DialingAttemptIter<'b,
TInEvent,
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error,
TConnInfo,
TPeerId>
{
DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing)
}
pub fn some_attempt<'b>(&'b mut self)
-> DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>
{
self.attempts()
.into_first()
.expect("By `Peer::new` and the definition of `DialingPeer`.")
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DialingPeer")
.field("peer_id", &self.peer_id)
.field("established", &self.network.pool.iter_peer_established_info(&self.peer_id))
.field("attempts", &self.network.dialing.get(&self.peer_id))
.finish()
}
}
pub struct DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
{
peer_id: TPeerId,
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DisconnectedPeer")
.field("peer_id", &self.peer_id)
.finish()
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
Peer::Disconnected(self)
}
pub fn set_connected<TMuxer>(
self,
connected: Connected<TConnInfo>,
connection: Connection<TMuxer, THandler::Handler>,
) -> Result<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
ConnectionLimit
> where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: Send + 'static,
TTrans::Error: Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + fmt::Debug + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
{
if connected.peer_id() != &self.peer_id {
panic!("Invalid peer ID given: {:?}. Expected: {:?}", connected.peer_id(), self.peer_id)
}
self.network.pool.add(connection, connected)
.map(|_id| ConnectedPeer {
network: self.network,
peer_id: self.peer_id,
})
}
}
#[derive(Debug, Clone)]
pub(super) struct DialingState {
pub(super) current: (ConnectionId, Multiaddr),
pub(super) remaining: Vec<Multiaddr>,
}
pub struct DialingAttempt<'a, TInEvent, TConnInfo, TPeerId> {
inner: PendingConnection<'a, TInEvent, TConnInfo, TPeerId>,
attempts: hash_map::OccupiedEntry<'a, TPeerId, SmallVec<[DialingState; 10]>>,
pos: usize,
}
impl<'a, TInEvent, TConnInfo, TPeerId>
DialingAttempt<'a, TInEvent, TConnInfo, TPeerId>
{
pub fn id(&self) -> ConnectionId {
self.inner.id()
}
pub fn peer_id(&self) -> &TPeerId {
self.attempts.key()
}
pub fn address(&self) -> &Multiaddr {
match self.inner.endpoint() {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { .. } => unreachable!("by definition of a `DialingAttempt`.")
}
}
pub fn abort(mut self) {
self.attempts.get_mut().remove(self.pos);
if self.attempts.get().is_empty() {
self.attempts.remove();
}
self.inner.abort();
}
pub fn add_address(&mut self, addr: Multiaddr) {
let remaining = &mut self.attempts.get_mut()[self.pos].remaining;
if remaining.iter().all(|a| a != &addr) {
remaining.push(addr);
}
}
}
pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
peer_id: &'a TPeerId,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
dialing: &'a mut FnvHashMap<TPeerId, SmallVec<[DialingState; 10]>>,
pos: usize,
end: usize,
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
fn new(
peer_id: &'a TPeerId,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
dialing: &'a mut FnvHashMap<TPeerId, SmallVec<[DialingState; 10]>>,
) -> Self {
let end = dialing.get(peer_id).map_or(0, |conns| conns.len());
Self { pos: 0, end, pool, dialing, peer_id }
}
pub fn next<'b>(&'b mut self) -> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>> {
if self.pos == self.end {
return None
}
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) {
let id = attempts.get()[self.pos].current.0;
if let Some(inner) = self.pool.get_outgoing(id) {
let conn = DialingAttempt { pos: self.pos, inner, attempts };
self.pos += 1;
return Some(conn)
}
}
None
}
pub fn into_first<'b>(self)
-> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>>
where 'a: 'b
{
if self.pos == self.end {
return None
}
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) {
let id = attempts.get()[self.pos].current.0;
if let Some(inner) = self.pool.get_outgoing(id) {
return Some(DialingAttempt { pos: self.pos, inner, attempts })
}
}
None
}
}