use futures::{prelude::*, future::Ready};
use libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport};
use parity_send_wrapper::SendWrapper;
use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll};
use wasm_bindgen::{JsCast, prelude::*};
use wasm_bindgen_futures::JsFuture;
pub mod ffi {
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
extern "C" {
pub type Transport;
pub type Connection;
pub type ListenEvent;
pub type ConnectionEvent;
#[wasm_bindgen(method, catch)]
pub fn dial(this: &Transport, multiaddr: &str) -> Result<js_sys::Promise, JsValue>;
#[wasm_bindgen(method, catch)]
pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>;
#[wasm_bindgen(method, getter)]
pub fn read(this: &Connection) -> js_sys::Iterator;
#[wasm_bindgen(method, catch)]
pub fn write(this: &Connection, data: &[u8]) -> Result<js_sys::Promise, JsValue>;
#[wasm_bindgen(method, catch)]
pub fn shutdown(this: &Connection) -> Result<(), JsValue>;
#[wasm_bindgen(method)]
pub fn close(this: &Connection);
#[wasm_bindgen(method, getter)]
pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
#[wasm_bindgen(method, getter)]
pub fn expired_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;
#[wasm_bindgen(method, getter)]
pub fn new_connections(this: &ListenEvent) -> Option<Box<[JsValue]>>;
#[wasm_bindgen(method, getter)]
pub fn next_event(this: &ListenEvent) -> JsValue;
#[wasm_bindgen(method, getter)]
pub fn connection(this: &ConnectionEvent) -> Connection;
#[wasm_bindgen(method, getter)]
pub fn observed_addr(this: &ConnectionEvent) -> String;
#[wasm_bindgen(method, getter)]
pub fn local_addr(this: &ConnectionEvent) -> String;
}
#[cfg(feature = "websocket")]
#[wasm_bindgen(module = "/src/websockets.js")]
extern "C" {
pub fn websocket_transport() -> Transport;
}
}
pub struct ExtTransport {
inner: SendWrapper<ffi::Transport>,
}
impl ExtTransport {
pub fn new(transport: ffi::Transport) -> Self {
ExtTransport {
inner: SendWrapper::new(transport),
}
}
}
impl fmt::Debug for ExtTransport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ExtTransport").finish()
}
}
impl Clone for ExtTransport {
fn clone(&self) -> Self {
ExtTransport {
inner: SendWrapper::new(self.inner.clone().into()),
}
}
}
impl Transport for ExtTransport {
type Output = Connection;
type Error = JsErr;
type Listener = Listen;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = Dial;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let iter = self
.inner
.listen_on(&addr.to_string())
.map_err(|err| {
if is_not_supported_error(&err) {
TransportError::MultiaddrNotSupported(addr)
} else {
TransportError::Other(JsErr::from(err))
}
})?;
Ok(Listen {
iterator: SendWrapper::new(iter),
next_event: None,
pending_events: VecDeque::new(),
})
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let promise = self
.inner
.dial(&addr.to_string())
.map_err(|err| {
if is_not_supported_error(&err) {
TransportError::MultiaddrNotSupported(addr)
} else {
TransportError::Other(JsErr::from(err))
}
})?;
Ok(Dial {
inner: SendWrapper::new(promise.into()),
})
}
}
#[must_use = "futures do nothing unless polled"]
pub struct Dial {
inner: SendWrapper<JsFuture>,
}
impl fmt::Debug for Dial {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Dial").finish()
}
}
impl Future for Dial {
type Output = Result<Connection, JsErr>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut *self.inner), cx) {
Poll::Ready(Ok(connec)) => Poll::Ready(Ok(Connection::new(connec.into()))),
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(JsErr::from(err))),
}
}
}
#[must_use = "futures do nothing unless polled"]
pub struct Listen {
iterator: SendWrapper<js_sys::Iterator>,
next_event: Option<SendWrapper<JsFuture>>,
pending_events: VecDeque<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>>,
}
impl fmt::Debug for Listen {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Listen").finish()
}
}
impl Stream for Listen {
type Item = Result<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>, JsErr>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
if self.next_event.is_none() {
if let Ok(ev) = self.iterator.next() {
if !ev.done() {
let promise: js_sys::Promise = ev.value().into();
self.next_event = Some(SendWrapper::new(promise.into()));
}
}
}
let event = if let Some(next_event) = self.next_event.as_mut() {
let e = match Future::poll(Pin::new(&mut **next_event), cx) {
Poll::Ready(Ok(ev)) => ffi::ListenEvent::from(ev),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
};
self.next_event = None;
e
} else {
return Poll::Ready(None);
};
for addr in event
.new_addrs()
.into_iter()
.flat_map(|e| e.to_vec().into_iter())
{
let addr = js_value_to_addr(&addr)?;
self.pending_events
.push_back(ListenerEvent::NewAddress(addr));
}
for upgrade in event
.new_connections()
.into_iter()
.flat_map(|e| e.to_vec().into_iter())
{
let upgrade: ffi::ConnectionEvent = upgrade.into();
self.pending_events.push_back(ListenerEvent::Upgrade {
local_addr: upgrade.local_addr().parse()?,
remote_addr: upgrade.observed_addr().parse()?,
upgrade: futures::future::ok(Connection::new(upgrade.connection())),
});
}
for addr in event
.expired_addrs()
.into_iter()
.flat_map(|e| e.to_vec().into_iter())
{
match js_value_to_addr(&addr) {
Ok(addr) => self.pending_events.push_back(ListenerEvent::NewAddress(addr)),
Err(err) => self.pending_events.push_back(ListenerEvent::Error(err)),
}
}
}
}
}
pub struct Connection {
inner: SendWrapper<ffi::Connection>,
read_iterator: SendWrapper<js_sys::Iterator>,
read_state: ConnectionReadState,
previous_write_promise: Option<SendWrapper<JsFuture>>,
}
impl Connection {
fn new(inner: ffi::Connection) -> Self {
let read_iterator = inner.read();
Connection {
inner: SendWrapper::new(inner),
read_iterator: SendWrapper::new(read_iterator),
read_state: ConnectionReadState::PendingData(Vec::new()),
previous_write_promise: None,
}
}
}
enum ConnectionReadState {
PendingData(Vec<u8>),
Waiting(SendWrapper<JsFuture>),
Finished,
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Connection").finish()
}
}
impl AsyncRead for Connection {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
loop {
match mem::replace(&mut self.read_state, ConnectionReadState::Finished) {
ConnectionReadState::Finished => break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
ConnectionReadState::PendingData(ref data) if data.is_empty() => {
let iter_next = self.read_iterator.next().map_err(JsErr::from)?;
if iter_next.done() {
self.read_state = ConnectionReadState::Finished;
} else {
let promise: js_sys::Promise = iter_next.value().into();
let promise = SendWrapper::new(promise.into());
self.read_state = ConnectionReadState::Waiting(promise);
}
continue;
}
ConnectionReadState::PendingData(mut data) => {
debug_assert!(!data.is_empty());
if buf.len() <= data.len() {
buf.copy_from_slice(&data[..buf.len()]);
self.read_state =
ConnectionReadState::PendingData(data.split_off(buf.len()));
break Poll::Ready(Ok(buf.len()));
} else {
let len = data.len();
buf[..len].copy_from_slice(&data);
self.read_state = ConnectionReadState::PendingData(Vec::new());
break Poll::Ready(Ok(len));
}
}
ConnectionReadState::Waiting(mut promise) => {
let data = match Future::poll(Pin::new(&mut *promise), cx) {
Poll::Ready(Ok(ref data)) if data.is_null() => break Poll::Ready(Ok(0)),
Poll::Ready(Ok(data)) => data,
Poll::Ready(Err(err)) => break Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
Poll::Pending => {
self.read_state = ConnectionReadState::Waiting(promise);
break Poll::Pending;
}
};
let data = js_sys::Uint8Array::new(&data);
let data_len = data.length() as usize;
if data_len <= buf.len() {
data.copy_to(&mut buf[..data_len]);
self.read_state = ConnectionReadState::PendingData(Vec::new());
break Poll::Ready(Ok(data_len));
} else {
let mut tmp_buf = vec![0; data_len];
data.copy_to(&mut tmp_buf[..]);
self.read_state = ConnectionReadState::PendingData(tmp_buf);
continue;
}
}
}
}
}
}
impl AsyncWrite for Connection {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
if let Some(mut promise) = self.previous_write_promise.take() {
match Future::poll(Pin::new(&mut *promise), cx) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => return Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
Poll::Pending => {
self.previous_write_promise = Some(promise);
return Poll::Pending;
}
}
}
debug_assert!(self.previous_write_promise.is_none());
self.previous_write_promise = Some(SendWrapper::new(
self.inner.write(buf).map_err(JsErr::from)?.into(),
));
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match self.inner.shutdown() {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(io::Error::from(JsErr::from(err)))),
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
self.inner.close();
}
}
fn is_not_supported_error(err: &JsValue) -> bool {
if let Some(err) = err.dyn_ref::<js_sys::Error>() {
if String::from(err.name()) == "NotSupportedError" {
true
} else {
false
}
} else {
false
}
}
fn js_value_to_addr(addr: &JsValue) -> Result<Multiaddr, JsErr> {
if let Some(addr) = addr.as_string() {
Ok(addr.parse()?)
} else {
Err(JsValue::from_str("Element in new_addrs is not a string").into())
}
}
pub struct JsErr(SendWrapper<JsValue>);
impl From<JsValue> for JsErr {
fn from(val: JsValue) -> JsErr {
JsErr(SendWrapper::new(val))
}
}
impl From<libp2p_core::multiaddr::Error> for JsErr {
fn from(err: libp2p_core::multiaddr::Error) -> JsErr {
JsValue::from_str(&err.to_string()).into()
}
}
impl From<JsErr> for io::Error {
fn from(err: JsErr) -> io::Error {
io::Error::new(io::ErrorKind::Other, err.to_string())
}
}
impl fmt::Debug for JsErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self)
}
}
impl fmt::Display for JsErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(s) = self.0.as_string() {
write!(f, "{}", s)
} else if let Some(err) = self.0.dyn_ref::<js_sys::Error>() {
write!(f, "{}", String::from(err.message()))
} else if let Some(obj) = self.0.dyn_ref::<js_sys::Object>() {
write!(f, "{}", String::from(obj.to_string()))
} else {
write!(f, "{:?}", &*self.0)
}
}
}
impl error::Error for JsErr {}