pub use crate::upgrade::Version;
use crate::{
ConnectedPoint,
ConnectionInfo,
Negotiated,
transport::{
Transport,
TransportError,
ListenerEvent,
and_then::AndThen,
},
muxing::StreamMuxer,
upgrade::{
self,
OutboundUpgrade,
InboundUpgrade,
apply_inbound,
apply_outbound,
UpgradeError,
OutboundUpgradeApply,
InboundUpgradeApply
}
};
use futures::{prelude::*, ready};
use multiaddr::Multiaddr;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
pub struct Builder<T> {
inner: T,
version: upgrade::Version,
}
impl<T> Builder<T>
where
T: Transport,
T::Error: 'static,
{
pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
Builder { inner, version }
}
pub fn authenticate<C, D, U, I, E>(self, upgrade: U) -> Builder<
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
> where
T: Transport<Output = C>,
I: ConnectionInfo,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = (I, D), Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = (I, D), Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint, version)
}
}), version)
}
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Builder<Upgrade<T, U>>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Builder::new(Upgrade::new(self.inner, upgrade), self.version)
}
pub fn multiplex<C, M, U, I, E>(self, upgrade: U)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
}
#[pin_project::pin_project]
pub struct Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>
{
#[pin]
inner: EitherUpgrade<C, U>
}
impl<C, U> Future for Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>,
Output = <U as InboundUpgrade<Negotiated<C>>>::Output,
Error = <U as InboundUpgrade<Negotiated<C>>>::Error
>
{
type Output = <EitherUpgrade<C, U> as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Future::poll(this.inner, cx)
}
}
#[pin_project::pin_project]
pub struct Multiplex<C, U, I>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
{
info: Option<I>,
#[pin]
upgrade: EitherUpgrade<C, U>,
}
impl<C, U, I, M, E> Future for Multiplex<C, U, I>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>
{
type Output = Result<(I, M), UpgradeError<E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let m = match ready!(Future::poll(this.upgrade, cx)) {
Ok(m) => m,
Err(err) => return Poll::Ready(Err(err)),
};
let i = this.info.take().expect("Multiplex future polled after completion.");
Poll::Ready(Ok((i, m)))
}
}
type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
#[derive(Debug, Copy, Clone)]
pub struct Upgrade<T, U> { inner: T, upgrade: U }
impl<T, U> Upgrade<T, U> {
pub fn new(inner: T, upgrade: U) -> Self {
Upgrade { inner, upgrade }
}
}
impl<T, C, D, U, I, E> Transport for Upgrade<T, U>
where
T: Transport<Output = (I, C)>,
T::Error: 'static,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static
{
type Output = (I, D);
type Error = TransportUpgradeError<T::Error, E>;
type Listener = ListenerStream<T::Listener, U>;
type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, I, C>;
type Dial = DialUpgradeFuture<T::Dial, U, I, C>;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.inner.dial(addr.clone())
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade))
})
}
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let stream = self.inner.listen_on(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(ListenerStream {
stream: Box::pin(stream),
upgrade: self.upgrade
})
}
}
#[derive(Debug)]
pub enum TransportUpgradeError<T, U> {
Transport(T),
Upgrade(UpgradeError<U>),
}
impl<T, U> fmt::Display for TransportUpgradeError<T, U>
where
T: fmt::Display,
U: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TransportUpgradeError::Transport(e) => write!(f, "Transport error: {}", e),
TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {}", e),
}
}
}
impl<T, U> Error for TransportUpgradeError<T, U>
where
T: Error + 'static,
U: Error + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
TransportUpgradeError::Transport(e) => Some(e),
TransportUpgradeError::Upgrade(e) => Some(e),
}
}
}
pub struct DialUpgradeFuture<F, U, I, C>
where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<I>, OutboundUpgradeApply<C, U>)>
}
impl<F, U, I, C, D> Future for DialUpgradeFuture<F, U, I, C>
where
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(I, D), TransportUpgradeError<F::Error, U::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
loop {
this.upgrade = match this.upgrade {
future::Either::Left(ref mut up) => {
let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err)),
};
let u = up.take().expect("DialUpgradeFuture is constructed with Either::Left(Some).");
future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
}
future::Either::Right((ref mut i, ref mut up)) => {
let d = match ready!(Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
Ok(d) => d,
Err(err) => return Poll::Ready(Err(err)),
};
let i = i.take().expect("DialUpgradeFuture polled after completion.");
return Poll::Ready(Ok((i, d)))
}
}
}
}
}
impl<F, U, I, C> Unpin for DialUpgradeFuture<F, U, I, C>
where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
}
pub struct ListenerStream<S, U> {
stream: Pin<Box<S>>,
upgrade: U
}
impl<S, U, F, I, C, D, E> Stream for ListenerStream<S, U>
where
S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
{
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, I, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
Some(Ok(event)) => {
let event = event
.map(move |future| {
ListenerUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade.clone()))
}
})
.map_err(TransportUpgradeError::Transport);
Poll::Ready(Some(Ok(event)))
}
Some(Err(err)) => {
Poll::Ready(Some(Err(TransportUpgradeError::Transport(err))))
}
None => Poll::Ready(None)
}
}
}
impl<S, U> Unpin for ListenerStream<S, U> {
}
pub struct ListenerUpgradeFuture<F, U, I, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<I>, InboundUpgradeApply<C, U>)>
}
impl<F, U, I, C, D> Future for ListenerUpgradeFuture<F, U, I, C>
where
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(I, D), TransportUpgradeError<F::Error, U::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
loop {
this.upgrade = match this.upgrade {
future::Either::Left(ref mut up) => {
let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err))
};
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
future::Either::Right((Some(i), apply_inbound(c, u)))
}
future::Either::Right((ref mut i, ref mut up)) => {
let d = match ready!(TryFuture::try_poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err))
};
let i = i.take().expect("ListenerUpgradeFuture polled after completion.");
return Poll::Ready(Ok((i, d)))
}
}
}
}
}
impl<F, U, I, C> Unpin for ListenerUpgradeFuture<F, U, I, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
{
}