use crate::{Network, Validator};
use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL};
use sc_network::{Event, ReputationChange};
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender, Receiver};
use libp2p::PeerId;
use log::trace;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{
	borrow::Cow,
	collections::{HashMap, VecDeque},
	pin::Pin,
	sync::Arc,
	task::{Context, Poll},
};
pub struct GossipEngine<B: BlockT> {
	state_machine: ConsensusGossip<B>,
	network: Box<dyn Network<B> + Send>,
	periodic_maintenance_interval: futures_timer::Delay,
	engine_id: ConsensusEngineId,
	
	network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
	
	message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
	
	forwarding_state: ForwardingState<B>,
}
enum ForwardingState<B: BlockT> {
	
	
	Idle,
	
	
	
	Busy(VecDeque<(B::Hash, TopicNotification)>),
}
impl<B: BlockT> Unpin for GossipEngine<B> {}
impl<B: BlockT> GossipEngine<B> {
	
	pub fn new<N: Network<B> + Send + Clone + 'static>(
		network: N,
		engine_id: ConsensusEngineId,
		protocol_name: impl Into<Cow<'static, str>>,
		validator: Arc<dyn Validator<B>>,
	) -> Self where B: 'static {
		
		
		let network_event_stream = network.event_stream();
		network.register_notifications_protocol(engine_id, protocol_name.into());
		GossipEngine {
			state_machine: ConsensusGossip::new(validator, engine_id),
			network: Box::new(network),
			periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
			engine_id,
			network_event_stream,
			message_sinks: HashMap::new(),
			forwarding_state: ForwardingState::Idle,
		}
	}
	pub fn report(&self, who: PeerId, reputation: ReputationChange) {
		self.network.report_peer(who, reputation);
	}
	
	
	
	
	
	pub fn register_gossip_message(
		&mut self,
		topic: B::Hash,
		message: Vec<u8>,
	) {
		self.state_machine.register_message(topic, message);
	}
	
	pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
		self.state_machine.broadcast_topic(&mut *self.network, topic, force);
	}
	
	pub fn messages_for(&mut self, topic: B::Hash)
		-> Receiver<TopicNotification>
	{
		let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
		
		
		
		
		
		let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
		for notification in past_messages{
			tx.try_send(notification)
				.expect("receiver known to be live, and buffer size known to suffice; qed");
		}
		self.message_sinks.entry(topic).or_default().push(tx);
		rx
	}
	
	pub fn send_topic(
		&mut self,
		who: &PeerId,
		topic: B::Hash,
		force: bool
	) {
		self.state_machine.send_topic(&mut *self.network, who, topic, force)
	}
	
	pub fn gossip_message(
		&mut self,
		topic: B::Hash,
		message: Vec<u8>,
		force: bool,
	) {
		self.state_machine.multicast(&mut *self.network, topic, message, force)
	}
	
	
	pub fn send_message(&mut self, who: Vec<sc_network::PeerId>, data: Vec<u8>) {
		for who in &who {
			self.state_machine.send_message(&mut *self.network, who, data.clone());
		}
	}
	
	
	
	
	pub fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
		self.network.announce(block, associated_data);
	}
}
impl<B: BlockT> Future for GossipEngine<B> {
	type Output = ();
	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		let this = &mut *self;
		'outer: loop {
			match &mut this.forwarding_state {
				ForwardingState::Idle => {
					match this.network_event_stream.poll_next_unpin(cx) {
						Poll::Ready(Some(event)) => match event {
							Event::NotificationStreamOpened { remote, engine_id, role } => {
								if engine_id != this.engine_id {
									continue;
								}
								this.state_machine.new_peer(&mut *this.network, remote, role);
							}
							Event::NotificationStreamClosed { remote, engine_id } => {
								if engine_id != this.engine_id {
									continue;
								}
								this.state_machine.peer_disconnected(&mut *this.network, remote);
							},
							Event::NotificationsReceived { remote, messages } => {
								let messages = messages.into_iter().filter_map(|(engine, data)| {
									if engine == this.engine_id {
										Some(data.to_vec())
									} else {
										None
									}
								}).collect();
								let to_forward = this.state_machine.on_incoming(
									&mut *this.network,
									remote,
									messages,
								);
								this.forwarding_state = ForwardingState::Busy(to_forward.into());
							},
							Event::Dht(_) => {}
						}
						
						Poll::Ready(None) => return Poll::Ready(()),
						Poll::Pending => break,
					}
				}
				ForwardingState::Busy(to_forward) => {
					let (topic, notification) = match to_forward.pop_front() {
						Some(n) => n,
						None => {
							this.forwarding_state = ForwardingState::Idle;
							continue;
						}
					};
					let sinks = match this.message_sinks.get_mut(&topic) {
						Some(sinks) => sinks,
						None => {
							continue;
						},
					};
					
					for sink in sinks.iter_mut() {
						match sink.poll_ready(cx) {
							Poll::Ready(Ok(())) => {},
							
							Poll::Ready(Err(_)) => {},
							Poll::Pending => {
								
								to_forward.push_front((topic, notification));
								break 'outer;
							}
						}
					}
					
					sinks.retain(|sink| !sink.is_closed()); 
					if sinks.is_empty() {
						this.message_sinks.remove(&topic);
						continue;
					}
					trace!(
						target: "gossip",
						"Pushing consensus message to sinks for {}.", topic,
					);
					
					for sink in sinks {
						match sink.start_send(notification.clone()) {
							Ok(()) => {},
							Err(e) if e.is_full() => unreachable!(
								"Previously ensured that all sinks are ready; qed.",
							),
							
							Err(_) => {},
						}
					}
				}
			}
		}
		while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
			this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
			this.state_machine.tick(&mut *this.network);
			this.message_sinks.retain(|_, sinks| {
				sinks.retain(|sink| !sink.is_closed());
				!sinks.is_empty()
			});
		}
		Poll::Pending
	}
}
#[cfg(test)]
mod tests {
	use async_std::task::spawn;
	use crate::{ValidationResult, ValidatorContext};
	use futures::{channel::mpsc::{unbounded, UnboundedSender}, executor::{block_on, block_on_stream}, future::poll_fn};
	use quickcheck::{Arbitrary, Gen, QuickCheck};
	use rand::Rng;
	use sc_network::ObservedRole;
	use sp_runtime::{testing::H256, traits::{Block as BlockT}};
	use std::convert::TryInto;
	use std::sync::{Arc, Mutex};
	use substrate_test_runtime_client::runtime::Block;
	use super::*;
	#[derive(Clone, Default)]
	struct TestNetwork {
		inner: Arc<Mutex<TestNetworkInner>>,
	}
	#[derive(Clone, Default)]
	struct TestNetworkInner {
		event_senders: Vec<UnboundedSender<Event>>,
	}
	impl<B: BlockT> Network<B> for TestNetwork {
		fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
			let (tx, rx) = unbounded();
			self.inner.lock().unwrap().event_senders.push(tx);
			Box::pin(rx)
		}
		fn report_peer(&self, _: PeerId, _: ReputationChange) {
		}
		fn disconnect_peer(&self, _: PeerId) {
			unimplemented!();
		}
		fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec<u8>) {
			unimplemented!();
		}
		fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, str>) {}
		fn announce(&self, _: B::Hash, _: Vec<u8>) {
			unimplemented!();
		}
	}
	struct AllowAll;
	impl Validator<Block> for AllowAll {
		fn validate(
			&self,
			_context: &mut dyn ValidatorContext<Block>,
			_sender: &PeerId,
			_data: &[u8],
		) -> ValidationResult<H256> {
			ValidationResult::ProcessAndKeep(H256::default())
		}
	}
	
	
	
	
	#[test]
	fn returns_when_network_event_stream_closes() {
		let network = TestNetwork::default();
		let mut gossip_engine = GossipEngine::<Block>::new(
			network.clone(),
			[1, 2, 3, 4],
			"my_protocol",
			Arc::new(AllowAll{}),
		);
		
		drop(network.inner.lock().unwrap().event_senders.pop());
		block_on(poll_fn(move |ctx| {
			if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
				panic!(
					"Expected gossip engine to finish on first poll, given that \
					 `GossipEngine.network_event_stream` closes right away."
				)
			}
			Poll::Ready(())
		}))
	}
	#[test]
	fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
		let topic = H256::default();
		let engine_id = [1, 2, 3, 4];
		let remote_peer = PeerId::random();
		let network = TestNetwork::default();
		let mut gossip_engine = GossipEngine::<Block>::new(
			network.clone(),
			engine_id.clone(),
			"my_protocol",
			Arc::new(AllowAll{}),
		);
		let mut event_sender = network.inner.lock()
			.unwrap()
			.event_senders
			.pop()
			.unwrap();
		
		event_sender.start_send(
			Event::NotificationStreamOpened {
				remote: remote_peer.clone(),
				engine_id: engine_id.clone(),
				role: ObservedRole::Authority,
			}
		).expect("Event stream is unbounded; qed.");
		let messages = vec![vec![1], vec![2]];
		let events = messages.iter().cloned().map(|m| {
			Event::NotificationsReceived {
				remote: remote_peer.clone(),
				messages: vec![(engine_id, m.into())]
			}
		}).collect::<Vec<_>>();
		
		event_sender.start_send(events[0].clone()).expect("Event stream is unbounded; qed.");
		let mut subscribers = vec![];
		for _ in 0..2 {
			subscribers.push(gossip_engine.messages_for(topic));
		}
		
		event_sender.start_send(events[1].clone()).expect("Event stream is unbounded; qed.");
		spawn(gossip_engine);
		let mut subscribers = subscribers.into_iter()
			.map(|s| block_on_stream(s))
			.collect::<Vec<_>>();
		
		for message in messages {
			for subscriber in subscribers.iter_mut() {
				assert_eq!(
					subscriber.next(),
					Some(TopicNotification {
						message: message.clone(),
						sender: Some(remote_peer.clone()),
					}),
				);
			}
		}
	}
	#[test]
	fn forwarding_to_different_size_and_topic_channels() {
		#[derive(Clone, Debug)]
		struct ChannelLengthAndTopic{
			length: usize,
			topic: H256,
		}
		impl Arbitrary for ChannelLengthAndTopic {
			fn arbitrary<G: Gen>(g: &mut G) -> Self {
				Self {
					length: g.gen_range(0, 100),
					
					
					topic: H256::from_low_u64_ne(g.gen_range(0, 10)),
				}
			}
		}
		#[derive(Clone, Debug)]
		struct Message {
			topic: H256,
		}
		impl Arbitrary for Message{
			fn arbitrary<G: Gen>(g: &mut G) -> Self {
				Self {
					
					
					topic: H256::from_low_u64_ne(g.gen_range(0, 10)),
				}
			}
		}
		
		
		struct TestValidator;
		impl Validator<Block> for TestValidator {
			fn validate(
				&self,
				_context: &mut dyn ValidatorContext<Block>,
				_sender: &PeerId,
				data: &[u8],
			) -> ValidationResult<H256> {
				ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
			}
		}
		fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
			let engine_id = [1, 2, 3, 4];
			let remote_peer = PeerId::random();
			let network = TestNetwork::default();
			let num_channels_per_topic = channels.iter()
				.fold(HashMap::new(), |mut acc, ChannelLengthAndTopic { topic, .. }| {
					acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
					acc
				});
			let expected_msgs_per_topic_all_chan = notifications.iter()
				.fold(HashMap::new(), |mut acc, messages| {
					for message in messages {
						acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
					}
					acc
				})
				.into_iter()
				
				
				
				.map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
				.collect::<HashMap<H256, _>>();
			let mut gossip_engine = GossipEngine::<Block>::new(
				network.clone(),
				engine_id.clone(),
				"my_protocol",
				Arc::new(TestValidator{}),
			);
			
			let (txs, mut rxs) = channels.iter()
				.map(|ChannelLengthAndTopic { length, topic }| {
					(topic.clone(), channel(*length))
				})
				.fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
					acc.0.push((topic, tx)); acc.1.push((topic, rx));
					acc
				});
			
			for (topic, tx) in txs {
				match gossip_engine.message_sinks.get_mut(&topic) {
					Some(entry) =>  entry.push(tx),
					None => {gossip_engine.message_sinks.insert(topic, vec![tx]);},
				}
			}
			let mut event_sender = network.inner.lock()
				.unwrap()
				.event_senders
				.pop()
				.unwrap();
			
			event_sender.start_send(
				Event::NotificationStreamOpened {
					remote: remote_peer.clone(),
					engine_id: engine_id.clone(),
					role: ObservedRole::Authority,
				}
			).expect("Event stream is unbounded; qed.");
			
			for (i_notification, messages) in notifications.iter().enumerate() {
				let messages = messages.into_iter().enumerate()
					.map(|(i_message, Message { topic })| {
						
						
						let mut message = topic.as_bytes().to_vec();
						
						
						message.push(i_notification.try_into().unwrap());
						message.push(i_message.try_into().unwrap());
						(engine_id, message.into())
					}).collect();
				event_sender.start_send(Event::NotificationsReceived {
					remote: remote_peer.clone(),
					messages,
				}).expect("Event stream is unbounded; qed.");
			}
			let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
			
			block_on(poll_fn(|cx| {
				loop {
					if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
						unreachable!(
							"Event stream sender side is not dropped, thus gossip engine does not \
							 terminate",
						);
					}
					let mut progress = false;
					for (topic, rx) in rxs.iter_mut() {
						match rx.poll_next_unpin(cx) {
							Poll::Ready(Some(_)) => {
								progress = true;
								received_msgs_per_topic_all_chan.entry(*topic)
									.and_modify(|e| *e += 1)
									.or_insert(1);
							},
							Poll::Ready(None) => unreachable!(
								"Sender side of channel is never dropped",
							),
							Poll::Pending => {},
						}
					}
					if !progress {
						break;
					}
				}
				Poll::Ready(())
			}));
			
			for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
				assert_eq!(
					received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
					expected_num,
				);
			}
			for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
				assert_eq!(
					expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
					received_num,
				);
			}
		}
		
		prop(vec![], vec![vec![Message{ topic: H256::default()}]]);
		prop(
			vec![ChannelLengthAndTopic {length: 71, topic: H256::default()}],
			vec![vec![Message{ topic: H256::default()}]],
		);
		QuickCheck::new().quickcheck(prop as fn(_, _))
	}
}