use crate::{
    Config,
    WindowUpdateMode,
    chunks::Chunks,
    connection::{self, StreamCommand},
    frame::{
        Frame,
        header::{Header, StreamId, Data, WindowUpdate}
    }
};
use futures::{future::Either, ready, channel::mpsc, io::{AsyncRead, AsyncWrite}};
use parking_lot::{Mutex, MutexGuard};
use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll, Waker}};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum State {
    
    Open,
    
    SendClosed,
    
    RecvClosed,
    
    Closed
}
impl State {
    
    pub fn can_read(self) -> bool {
        if let State::RecvClosed | State::Closed = self {
            false
        } else {
            true
        }
    }
    
    pub fn can_write(self) -> bool {
        if let State::SendClosed | State::Closed = self {
            false
        } else {
            true
        }
    }
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(crate) enum Flag {
    
    None,
    
    Syn,
    
    Ack
}
pub struct Stream {
    id: StreamId,
    conn: connection::Id,
    config: Arc<Config>,
    sender: mpsc::Sender<StreamCommand>,
    window_update: Option<Frame<WindowUpdate>>,
    flag: Flag,
    shared: Arc<Mutex<Shared>>
}
impl fmt::Debug for Stream {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Stream")
            .field("id", &self.id.val())
            .field("connection", &self.conn)
            .field("window_update", &self.window_update.is_some())
            .finish()
    }
}
impl fmt::Display for Stream {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "(Stream {}/{})", self.conn, self.id.val())
    }
}
impl Stream {
    pub(crate) fn new
        ( id: StreamId
        , conn: connection::Id
        , config: Arc<Config>
        , window: u32
        , credit: u32
        , sender: mpsc::Sender<StreamCommand>
        ) -> Self
    {
        Stream {
            id,
            conn,
            config,
            sender,
            window_update: None,
            flag: Flag::None,
            shared: Arc::new(Mutex::new(Shared::new(window, credit))),
        }
    }
    
    pub fn id(&self) -> StreamId {
        self.id
    }
    
    pub(crate) fn set_flag(&mut self, flag: Flag) {
        self.flag = flag
    }
    
    pub(crate) fn state(&self) -> State {
        self.shared().state()
    }
    pub(crate) fn strong_count(&self) -> usize {
        Arc::strong_count(&self.shared)
    }
    pub(crate) fn shared(&self) -> MutexGuard<'_, Shared> {
        self.shared.lock()
    }
    pub(crate) fn clone(&self) -> Self {
        Stream {
            id: self.id,
            conn: self.conn,
            config: self.config.clone(),
            sender: self.sender.clone(),
            window_update: None,
            flag: self.flag,
            shared: self.shared.clone()
        }
    }
    fn write_zero_err(&self) -> io::Error {
        let msg = format!("{}/{}: connection is closed", self.conn, self.id);
        io::Error::new(io::ErrorKind::WriteZero, msg)
    }
    
    fn add_flag(&mut self, header: &mut Header<Either<Data, WindowUpdate>>) {
        match self.flag {
            Flag::None => (),
            Flag::Syn => {
                header.syn();
                self.flag = Flag::None
            }
            Flag::Ack => {
                header.ack();
                self.flag = Flag::None
            }
        }
    }
    
    fn send_pending_window_update(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
        if self.window_update.is_some() {
            ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?);
            let mut frame = self.window_update.take().expect("window_update.is_some()").right();
            self.add_flag(frame.header_mut());
            let cmd = StreamCommand::SendFrame(frame);
            self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?;
            self.shared().window = self.config.receive_window
        }
        Poll::Ready(Ok(()))
    }
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Packet(Vec<u8>);
impl AsRef<[u8]> for Packet {
    fn as_ref(&self) -> &[u8] {
        self.0.as_ref()
    }
}
impl futures::stream::Stream for Stream {
    type Item = io::Result<Packet>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        if !self.config.read_after_close && self.sender.is_closed() {
            return Poll::Ready(None)
        }
        ready!(self.send_pending_window_update(cx))?;
        
        
        {
            let mut shared = self.shared();
            if let Some(bytes) = shared.buffer.pop() {
                let off = bytes.offset();
                let mut vec = bytes.into_vec();
                if off != 0 {
                    
                    
                    
                    
                    log::debug!("{}/{}: chunk has been partially consumed", self.conn, self.id);
                    vec = vec.split_off(off)
                }
                return Poll::Ready(Some(Ok(Packet(vec))))
            }
            
            if !shared.state().can_read() {
                log::debug!("{}/{}: eof", self.conn, self.id);
                return Poll::Ready(None) 
            }
            
            
            shared.reader = Some(cx.waker().clone());
            
            if self.config.window_update_mode != WindowUpdateMode::OnRead || shared.window > 0 {
                
                return Poll::Pending
            }
        }
        
        debug_assert!(self.window_update.is_none());
        self.window_update = Some(Frame::window_update(self.id, self.config.receive_window));
        ready!(self.send_pending_window_update(cx))?;
        Poll::Pending
    }
}
impl AsyncRead for Stream {
    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
        if !self.config.read_after_close && self.sender.is_closed() {
            return Poll::Ready(Ok(0))
        }
        ready!(self.send_pending_window_update(cx))?;
        
        
        {
            
            let mut shared = self.shared();
            let mut n = 0;
            while let Some(chunk) = shared.buffer.front_mut() {
                if chunk.is_empty() {
                    shared.buffer.pop();
                    continue
                }
                let k = std::cmp::min(chunk.len(), buf.len() - n);
                (&mut buf[n .. n + k]).copy_from_slice(&chunk.as_ref()[.. k]);
                n += k;
                chunk.advance(k);
                if n == buf.len() {
                    break
                }
            }
            if n > 0 {
                log::trace!("{}/{}: read {} bytes", self.conn, self.id, n);
                return Poll::Ready(Ok(n))
            }
            
            if !shared.state().can_read() {
                log::debug!("{}/{}: eof", self.conn, self.id);
                return Poll::Ready(Ok(0)) 
            }
            
            
            shared.reader = Some(cx.waker().clone());
            
            if self.config.window_update_mode != WindowUpdateMode::OnRead || shared.window > 0 {
                
                return Poll::Pending
            }
        }
        
        debug_assert!(self.window_update.is_none());
        self.window_update = Some(Frame::window_update(self.id, self.config.receive_window));
        ready!(self.send_pending_window_update(cx))?;
        Poll::Pending
    }
}
impl AsyncWrite for Stream {
    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
        ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?);
        let body = {
            let mut shared = self.shared();
            if !shared.state().can_write() {
                log::debug!("{}/{}: can no longer write", self.conn, self.id);
                return Poll::Ready(Err(self.write_zero_err()))
            }
            if shared.credit == 0 {
                log::trace!("{}/{}: no more credit left", self.conn, self.id);
                shared.writer = Some(cx.waker().clone());
                return Poll::Pending
            }
            let k = std::cmp::min(shared.credit as usize, buf.len());
            shared.credit = shared.credit.saturating_sub(k as u32);
            Vec::from(&buf[.. k])
        };
        let n = body.len();
        let mut frame = Frame::data(self.id, body).expect("body <= u32::MAX").left();
        self.add_flag(frame.header_mut());
        log::trace!("{}/{}: write {} bytes", self.conn, self.id, n);
        let cmd = StreamCommand::SendFrame(frame);
        self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?;
        Poll::Ready(Ok(n))
    }
    fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
        if self.state() == State::Closed {
            return Poll::Ready(Ok(()))
        }
        ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?);
        let ack = if self.flag == Flag::Ack {
            self.flag = Flag::None;
            true
        } else {
            false
        };
        log::trace!("{}/{}: close", self.conn, self.id);
        let cmd = StreamCommand::CloseStream { id: self.id, ack };
        self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?;
        self.shared().update_state(self.conn, self.id, State::SendClosed);
        Poll::Ready(Ok(()))
    }
}
#[derive(Debug)]
pub(crate) struct Shared {
    state: State,
    pub(crate) window: u32,
    pub(crate) credit: u32,
    pub(crate) buffer: Chunks,
    pub(crate) reader: Option<Waker>,
    pub(crate) writer: Option<Waker>
}
impl Shared {
    fn new(window: u32, credit: u32) -> Self {
        Shared {
            state: State::Open,
            window,
            credit,
            buffer: Chunks::new(),
            reader: None,
            writer: None
        }
    }
    pub(crate) fn state(&self) -> State {
        self.state
    }
    
    pub(crate) fn update_state(&mut self, cid: connection::Id, sid: StreamId, next: State) -> State {
        use self::State::*;
        let current = self.state;
        match (current, next) {
            (Closed,              _) => {}
            (Open,                _) => self.state = next,
            (RecvClosed,     Closed) => self.state = Closed,
            (RecvClosed,       Open) => {}
            (RecvClosed, RecvClosed) => {}
            (RecvClosed, SendClosed) => self.state = Closed,
            (SendClosed,     Closed) => self.state = Closed,
            (SendClosed,       Open) => {}
            (SendClosed, RecvClosed) => self.state = Closed,
            (SendClosed, SendClosed) => {}
        }
        log::trace!("{}/{}: update state: ({:?} {:?} {:?})", cid, sid, current, next, self.state);
        current 
    }
}