use crate::{
ConnectedPoint,
PeerId,
connection::{
self,
Connected,
Connection,
ConnectionId,
ConnectionLimit,
ConnectionError,
ConnectionHandler,
ConnectionInfo,
IncomingInfo,
IntoConnectionHandler,
OutgoingInfo,
Substream,
PendingConnectionError,
manager::{self, Manager, ManagerConfig},
},
muxing::StreamMuxer,
};
use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{convert::TryFrom as _, error, fmt, hash::Hash, num::NonZeroU32, task::Context, task::Poll};
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId> {
local_id: TPeerId,
limits: PoolLimits,
manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo>,
established: FnvHashMap<TPeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<TPeerId>)>,
disconnected: Vec<Disconnected<TConnInfo>>,
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool")
.field("limits", &self.limits)
.finish()
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {}
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
ConnectionEstablished {
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
num_established: NonZeroU32,
},
ConnectionClosed {
id: ConnectionId,
connected: Connected<TConnInfo>,
error: Option<ConnectionError<THandlerErr>>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
num_established: u32,
},
PendingConnectionError {
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTransErr>,
handler: Option<THandler>,
peer: Option<TPeerId>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
},
ConnectionEvent {
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
event: TOutEvent,
},
AddressChange {
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
new_endpoint: ConnectedPoint,
old_endpoint: ConnectedPoint,
},
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TOutEvent: fmt::Debug,
TTransErr: fmt::Debug,
THandlerErr: fmt::Debug,
TConnInfo: fmt::Debug,
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
PoolEvent::ConnectionEstablished { ref connection, .. } => {
f.debug_tuple("PoolEvent::ConnectionEstablished")
.field(connection)
.finish()
},
PoolEvent::ConnectionClosed { ref id, ref connected, ref error, .. } => {
f.debug_struct("PoolEvent::ConnectionClosed")
.field("id", id)
.field("connected", connected)
.field("error", error)
.finish()
},
PoolEvent::PendingConnectionError { ref id, ref error, .. } => {
f.debug_struct("PoolEvent::PendingConnectionError")
.field("id", id)
.field("error", error)
.finish()
},
PoolEvent::ConnectionEvent { ref connection, ref event } => {
f.debug_struct("PoolEvent::ConnectionEvent")
.field("conn_info", connection.info())
.field("event", event)
.finish()
},
PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
f.debug_struct("PoolEvent::AddressChange")
.field("conn_info", connection.info())
.field("new_endpoint", new_endpoint)
.field("old_endpoint", old_endpoint)
.finish()
},
}
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TPeerId: Eq + Hash,
{
pub fn new(
local_id: TPeerId,
manager_config: ManagerConfig,
limits: PoolLimits
) -> Self {
Pool {
local_id,
limits,
manager: Manager::new(manager_config),
established: Default::default(),
pending: Default::default(),
disconnected: Vec::new(),
}
}
pub fn limits(&self) -> &PoolLimits {
&self.limits
}
pub fn add_incoming<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
info: IncomingInfo<'_>,
) -> Result<ConnectionId, ConnectionLimit>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
Ok(self.add_pending(future, handler, endpoint, None))
}
pub fn add_outgoing<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
info: OutgoingInfo<'_, TPeerId>,
) -> Result<ConnectionId, ConnectionLimit>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
if let Some(peer) = &info.peer_id {
self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?;
}
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
fn add_pending<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
endpoint: ConnectedPoint,
peer: Option<TPeerId>,
) -> ConnectionId
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
let future = future.and_then({
let endpoint = endpoint.clone();
let expected_peer = peer.clone();
let local_id = self.local_id.clone();
move |(info, muxer)| {
if let Some(peer) = expected_peer {
if &peer != info.peer_id() {
return future::err(PendingConnectionError::InvalidPeerId)
}
}
if &local_id == info.peer_id() {
return future::err(PendingConnectionError::InvalidPeerId)
}
let connected = Connected { info, endpoint };
future::ready(Ok((connected, muxer)))
}
});
let id = self.manager.add_pending(future, handler);
self.pending.insert(id, (endpoint, peer));
id
}
pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected<TConnInfo>)
-> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = connection::Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
{
if let Some(limit) = self.limits.max_established_per_peer {
let current = self.num_peer_established(i.peer_id());
if limit >= current {
return Err(ConnectionLimit { limit, current })
}
}
let id = self.manager.add(c, i.clone());
self.established.entry(i.peer_id().clone()).or_default().insert(id, i.endpoint);
Ok(id)
}
pub fn get(&mut self, id: ConnectionId)
-> Option<PoolConnection<'_, TInEvent, TConnInfo, TPeerId>>
{
match self.manager.entry(id) {
Some(manager::Entry::Established(entry)) =>
Some(PoolConnection::Established(EstablishedConnection {
entry
})),
Some(manager::Entry::Pending(entry)) =>
Some(PoolConnection::Pending(PendingConnection {
entry,
pending: &mut self.pending,
})),
None => None
}
}
pub fn get_established(&mut self, id: ConnectionId)
-> Option<EstablishedConnection<'_, TInEvent, TConnInfo>>
{
match self.get(id) {
Some(PoolConnection::Established(c)) => Some(c),
_ => None
}
}
pub fn get_outgoing(&mut self, id: ConnectionId)
-> Option<PendingConnection<'_, TInEvent, TConnInfo, TPeerId>>
{
match self.pending.get(&id) {
Some((ConnectedPoint::Dialer { .. }, _peer)) =>
match self.manager.entry(id) {
Some(manager::Entry::Pending(entry)) =>
Some(PendingConnection {
entry,
pending: &mut self.pending,
}),
_ => unreachable!("by consistency of `self.pending` with `self.manager`")
}
_ => None
}
}
pub fn is_connected(&self, id: &TPeerId) -> bool {
self.established.contains_key(id)
}
pub fn num_connected(&self) -> usize {
self.established.len()
}
pub fn disconnect(&mut self, peer: &TPeerId) {
if let Some(conns) = self.established.get(peer) {
let mut num_established = 0;
for &id in conns.keys() {
match self.manager.entry(id) {
Some(manager::Entry::Established(e)) => {
let connected = e.remove();
self.disconnected.push(Disconnected {
id, connected, num_established
});
num_established += 1;
},
_ => {}
}
}
}
self.established.remove(peer);
let mut aborted = Vec::new();
for (&id, (_endpoint, peer2)) in &self.pending {
if Some(peer) == peer2.as_ref() {
match self.manager.entry(id) {
Some(manager::Entry::Pending(e)) => {
e.abort();
aborted.push(id);
},
_ => {}
}
}
}
for id in aborted {
self.pending.remove(&id);
}
}
pub fn num_established(&self) -> usize {
self.established.iter().fold(0, |n, (_, conns)| n + conns.len())
}
pub fn num_pending(&self) -> usize {
self.iter_pending_info().count()
}
pub fn num_peer_established(&self, peer: &TPeerId) -> usize {
self.established.get(peer).map_or(0, |conns| conns.len())
}
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
}
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
-> EstablishedConnectionIter<'a,
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTransErr,
THandlerErr,
TConnInfo,
TPeerId>
{
let ids = self.iter_peer_established_info(peer)
.map(|(id, _endpoint)| *id)
.collect::<SmallVec<[ConnectionId; 10]>>()
.into_iter();
EstablishedConnectionIter { pool: self, ids }
}
pub fn iter_pending_incoming(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, _)| {
match endpoint {
ConnectedPoint::Listener { local_addr, send_back_addr } => {
Some(IncomingInfo { local_addr, send_back_addr })
},
ConnectedPoint::Dialer { .. } => None,
}
})
}
pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_, TPeerId>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, ref peer_id)| {
match endpoint {
ConnectedPoint::Listener { .. } => None,
ConnectedPoint::Dialer { address } =>
Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
}
})
}
pub fn iter_peer_established_info(&self, peer: &TPeerId)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
{
match self.established.get(peer) {
Some(conns) => Either::Left(conns.iter()),
None => Either::Right(std::iter::empty())
}
}
pub fn iter_pending_info(&self)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<TPeerId>)> + '_
{
self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
}
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a TPeerId> + 'a {
self.established.keys()
}
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
> where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone,
TPeerId: Clone
{
while let Some(Disconnected {
id, connected, num_established
}) = self.disconnected.pop() {
return Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
num_established,
error: None,
pool: self,
})
}
loop {
let item = match self.manager.poll(cx) {
Poll::Ready(item) => item,
Poll::Pending => return Poll::Pending,
};
match item {
manager::Event::PendingConnectionError { id, error, handler } => {
if let Some((endpoint, peer)) = self.pending.remove(&id) {
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint,
error,
handler: Some(handler),
peer,
pool: self
})
}
},
manager::Event::ConnectionClosed { id, connected, error } => {
let num_established =
if let Some(conns) = self.established.get_mut(connected.peer_id()) {
conns.remove(&id);
u32::try_from(conns.len()).unwrap()
} else {
0
};
if num_established == 0 {
self.established.remove(connected.peer_id());
}
return Poll::Ready(PoolEvent::ConnectionClosed {
id, connected, error, num_established, pool: self
})
}
manager::Event::ConnectionEstablished { entry } => {
let id = entry.id();
if let Some((endpoint, peer)) = self.pending.remove(&id) {
let established = &self.established;
let current = || established.get(entry.connected().peer_id())
.map_or(0, |conns| conns.len());
if let Err(e) = self.limits.check_established(current) {
let connected = entry.remove();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
error: PendingConnectionError::ConnectionLimit(e),
handler: None,
peer,
pool: self
})
}
if cfg!(debug_assertions) {
if &self.local_id == entry.connected().peer_id() {
panic!("Unexpected local peer ID for remote.");
}
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
panic!("Unexpected peer ID mismatch.");
}
}
}
let peer = entry.connected().peer_id().clone();
let conns = self.established.entry(peer).or_default();
let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
.expect("n + 1 is always non-zero; qed");
conns.insert(id, endpoint);
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::ConnectionEstablished {
connection, num_established
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
}
},
manager::Event::ConnectionEvent { entry, event } => {
let id = entry.id();
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::ConnectionEvent {
connection,
event,
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
let id = entry.id();
match self.established.get_mut(entry.connected().peer_id()) {
Some(list) => *list.get_mut(&id)
.expect("state inconsistency: entry is `EstablishedEntry` but absent \
from `established`") = new_endpoint.clone(),
None => unreachable!("since `entry` is an `EstablishedEntry`.")
};
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
}
}
}
}
pub enum PoolConnection<'a, TInEvent, TConnInfo, TPeerId> {
Pending(PendingConnection<'a, TInEvent, TConnInfo, TPeerId>),
Established(EstablishedConnection<'a, TInEvent, TConnInfo>),
}
pub struct PendingConnection<'a, TInEvent, TConnInfo, TPeerId> {
entry: manager::PendingEntry<'a, TInEvent, TConnInfo>,
pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<TPeerId>)>,
}
impl<TInEvent, TConnInfo, TPeerId>
PendingConnection<'_, TInEvent, TConnInfo, TPeerId>
{
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
pub fn peer_id(&self) -> &Option<TPeerId> {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
}
pub fn endpoint(&self) -> &ConnectedPoint {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").0
}
pub fn abort(self) {
self.pending.remove(&self.entry.id());
self.entry.abort();
}
}
pub struct EstablishedConnection<'a, TInEvent, TConnInfo> {
entry: manager::EstablishedEntry<'a, TInEvent, TConnInfo>,
}
impl<TInEvent, TConnInfo> fmt::Debug
for EstablishedConnection<'_, TInEvent, TConnInfo>
where
TInEvent: fmt::Debug,
TConnInfo: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("EstablishedConnection")
.field("entry", &self.entry)
.finish()
}
}
impl<TInEvent, TConnInfo> EstablishedConnection<'_, TInEvent, TConnInfo> {
pub fn connected(&self) -> &Connected<TConnInfo> {
self.entry.connected()
}
pub fn endpoint(&self) -> &ConnectedPoint {
&self.entry.connected().endpoint
}
pub fn info(&self) -> &TConnInfo {
&self.entry.connected().info
}
}
impl<'a, TInEvent, TConnInfo> EstablishedConnection<'a, TInEvent, TConnInfo>
where
TConnInfo: ConnectionInfo,
{
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
pub fn peer_id(&self) -> &TConnInfo::PeerId {
self.info().peer_id()
}
pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
self.entry.notify_handler(event)
}
pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
self.entry.poll_ready_notify_handler(cx)
}
pub fn start_close(self) {
self.entry.start_close()
}
}
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
ids: I
}
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
I: Iterator<Item = ConnectionId>
{
pub fn next<'b>(&'b mut self) -> Option<EstablishedConnection<'b, TInEvent, TConnInfo>>
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) {
match self.pool.manager.entry(id) {
Some(manager::Entry::Established(entry)) => {
return Some(EstablishedConnection { entry })
}
_ => panic!("Established entry not found in manager.")
}
}
}
None
}
pub fn into_ids(self) -> impl Iterator<Item = ConnectionId> {
self.ids
}
pub fn into_first<'b>(mut self)
-> Option<EstablishedConnection<'b, TInEvent, TConnInfo>>
where 'a: 'b
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) {
match self.pool.manager.entry(id) {
Some(manager::Entry::Established(entry)) => {
return Some(EstablishedConnection { entry })
}
_ => panic!("Established entry not found in manager.")
}
}
}
None
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
pub max_outgoing_per_peer: Option<usize>,
}
impl PoolLimits {
fn check_established<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_established_per_peer)
}
fn check_outgoing<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing)
}
fn check_incoming<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_incoming)
}
fn check_outgoing_per_peer<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_outgoing_per_peer)
}
fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
if let Some(limit) = limit {
let current = current();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
Ok(())
}
}
struct Disconnected<TConnInfo> {
id: ConnectionId,
connected: Connected<TConnInfo>,
num_established: u32,
}