use crate::config::{GossipsubConfig, ValidationMode};
use crate::error::PublishError;
use crate::handler::GossipsubHandler;
use crate::mcache::MessageCache;
use crate::protocol::{
GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, SIGNING_PREFIX,
};
use crate::rpc_proto;
use crate::topic::{Topic, TopicHash};
use futures::prelude::*;
use libp2p_core::{
connection::ConnectionId, identity::error::SigningError, identity::Keypair, Multiaddr, PeerId,
};
use libp2p_swarm::{
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
};
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
use prost::Message;
use rand;
use rand::{seq::SliceRandom, thread_rng};
use std::{
collections::HashSet,
collections::VecDeque,
collections::{hash_map::HashMap, BTreeSet},
fmt, iter,
sync::Arc,
task::{Context, Poll},
};
use wasm_timer::{Instant, Interval};
mod tests;
#[derive(Clone)]
pub enum MessageAuthenticity {
Signed(Keypair),
Author(PeerId),
RandomAuthor,
Anonymous,
}
impl MessageAuthenticity {
fn is_signing(&self) -> bool {
match self {
MessageAuthenticity::Signed(_) => true,
_ => false,
}
}
fn is_anonymous(&self) -> bool {
match self {
MessageAuthenticity::Anonymous => true,
_ => false,
}
}
}
enum PublishConfig {
Signing {
keypair: Keypair,
author: PeerId,
inline_key: Option<Vec<u8>>,
},
Author(PeerId),
RandomAuthor,
Anonymous,
}
impl From<MessageAuthenticity> for PublishConfig {
fn from(authenticity: MessageAuthenticity) -> Self {
match authenticity {
MessageAuthenticity::Signed(keypair) => {
let public_key = keypair.public();
let key_enc = public_key.clone().into_protobuf_encoding();
let key = if key_enc.len() <= 42 {
None
} else {
Some(key_enc)
};
PublishConfig::Signing {
keypair,
author: public_key.into_peer_id(),
inline_key: key,
}
}
MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
}
}
}
pub struct Gossipsub {
config: GossipsubConfig,
events: VecDeque<NetworkBehaviourAction<Arc<GossipsubRpc>, GossipsubEvent>>,
control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
publish_config: PublishConfig,
duplication_cache: LruCache<MessageId, ()>,
topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,
mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
fanout_last_pub: HashMap<TopicHash, Instant>,
mcache: MessageCache,
heartbeat: Interval,
}
impl Gossipsub {
pub fn new(privacy: MessageAuthenticity, config: GossipsubConfig) -> Self {
validate_config(&privacy, &config.validation_mode);
Gossipsub {
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplication_cache: LruCache::with_expiry_duration(config.duplicate_cache_time),
topic_peers: HashMap::new(),
peer_topics: HashMap::new(),
mesh: HashMap::new(),
fanout: HashMap::new(),
fanout_last_pub: HashMap::new(),
mcache: MessageCache::new(
config.history_gossip,
config.history_length,
config.message_id_fn,
),
heartbeat: Interval::new_at(
Instant::now() + config.heartbeat_initial_delay,
config.heartbeat_interval,
),
config,
}
}
pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
self.mesh.keys()
}
pub fn peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
self.mesh.get(topic_hash).into_iter().map(|x| x.into_iter()).flatten()
}
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> {
let mut res = BTreeSet::new();
for peers in self.mesh.values() {
res.extend(peers);
}
res.into_iter()
}
pub fn subscribe(&mut self, topic: Topic) -> bool {
debug!("Subscribing to topic: {}", topic);
let topic_hash = self.topic_hash(topic.clone());
if self.mesh.get(&topic_hash).is_some() {
debug!("Topic: {} is already in the mesh.", topic);
return false;
}
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Arc::new(GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
});
for peer in peer_list {
debug!("Sending SUBSCRIBE to peer: {:?}", peer);
self.send_message(peer, event.clone());
}
}
self.join(&topic_hash);
info!("Subscribed to topic: {}", topic);
true
}
pub fn unsubscribe(&mut self, topic: Topic) -> bool {
debug!("Unsubscribing from topic: {}", topic);
let topic_hash = &self.topic_hash(topic);
if self.mesh.get(topic_hash).is_none() {
debug!("Already unsubscribed from topic: {:?}", topic_hash);
return false;
}
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = Arc::new(GossipsubRpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
});
for peer in peer_list {
debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
self.send_message(peer, event.clone());
}
}
self.leave(&topic_hash);
info!("Unsubscribed from topic: {:?}", topic_hash);
true
}
pub fn publish(&mut self, topic: &Topic, data: impl Into<Vec<u8>>) -> Result<(), PublishError> {
self.publish_many(iter::once(topic.clone()), data)
}
pub fn publish_many(
&mut self,
topics: impl IntoIterator<Item = Topic>,
data: impl Into<Vec<u8>>,
) -> Result<(), PublishError> {
let message = self.build_message(
topics.into_iter().map(|t| self.topic_hash(t)).collect(),
data.into(),
)?;
let msg_id = (self.config.message_id_fn)(&message);
if self.duplication_cache.insert(msg_id.clone(), ()).is_some() {
warn!(
"Not publishing a message that has already been published. Msg-id {}",
msg_id
);
return Err(PublishError::Duplicate);
}
self.mcache.put(message.clone());
debug!("Publishing message: {:?}", msg_id);
let mesh_peers_sent = self.forward_msg(message.clone(), None);
let mut recipient_peers = HashSet::new();
for topic_hash in &message.topics {
if self.mesh.get(&topic_hash).is_none() {
debug!("Topic: {:?} not in the mesh", topic_hash);
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(peer.clone());
}
} else {
let mesh_n = self.config.mesh_n;
let new_peers =
Self::get_random_peers(&self.topic_peers, &topic_hash, mesh_n, {
|_| true
});
self.fanout.insert(topic_hash.clone(), new_peers.clone());
for peer in new_peers {
debug!("Peer added to fanout: {:?}", peer);
recipient_peers.insert(peer.clone());
}
}
self.fanout_last_pub
.insert(topic_hash.clone(), Instant::now());
}
}
if recipient_peers.is_empty() && !mesh_peers_sent {
return Err(PublishError::InsufficientPeers);
}
let event = Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![message],
control_msgs: Vec::new(),
});
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
self.send_message(peer_id.clone(), event.clone());
}
info!("Published message: {:?}", msg_id);
Ok(())
}
pub fn validate_message(
&mut self,
message_id: &MessageId,
propagation_source: &PeerId,
) -> bool {
let message = match self.mcache.validate(message_id) {
Some(message) => message.clone(),
None => {
warn!(
"Message not in cache. Ignoring forwarding. Message Id: {}",
message_id
);
return false;
}
};
self.forward_msg(message, Some(propagation_source));
true
}
fn join(&mut self, topic_hash: &TopicHash) {
debug!("Running JOIN for topic: {:?}", topic_hash);
if self.mesh.contains_key(topic_hash) {
info!("JOIN: The topic is already in the mesh, ignoring JOIN");
return;
}
let mut added_peers = vec![];
if let Some((_, peers)) = self.fanout.remove_entry(topic_hash) {
debug!(
"JOIN: Removing peers from the fanout for topic: {:?}",
topic_hash
);
let add_peers = std::cmp::min(peers.len(), self.config.mesh_n);
debug!(
"JOIN: Adding {:?} peers from the fanout for topic: {:?}",
add_peers, topic_hash
);
added_peers.extend(peers.iter().cloned().take(add_peers));
self.mesh.insert(
topic_hash.clone(),
peers.into_iter().take(add_peers).collect(),
);
self.fanout_last_pub.remove(topic_hash);
}
if added_peers.len() < self.config.mesh_n {
let new_peers = Self::get_random_peers(
&self.topic_peers,
topic_hash,
self.config.mesh_n - added_peers.len(),
|peer| !added_peers.contains(peer),
);
added_peers.extend(new_peers.clone());
debug!(
"JOIN: Inserting {:?} random peers into the mesh",
new_peers.len()
);
let mesh_peers = self
.mesh
.entry(topic_hash.clone())
.or_insert_with(Default::default);
mesh_peers.extend(new_peers);
}
for peer_id in added_peers {
info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
Self::control_pool_add(
&mut self.control_pool,
peer_id.clone(),
GossipsubControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
}
debug!("Completed JOIN for topic: {:?}", topic_hash);
}
fn leave(&mut self, topic_hash: &TopicHash) {
debug!("Running LEAVE for topic {:?}", topic_hash);
if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
for peer in peers {
info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
Self::control_pool_add(
&mut self.control_pool,
peer.clone(),
GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
},
);
}
}
debug!("Completed LEAVE for topic: {:?}", topic_hash);
}
fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
debug!("Handling IHAVE for peer: {:?}", peer_id);
let mut iwant_ids = HashSet::new();
for (topic, ids) in ihave_msgs {
if !self.mesh.contains_key(&topic) {
debug!(
"IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
topic
);
continue;
}
for id in ids {
if self.mcache.get(&id).is_none() {
iwant_ids.insert(id);
}
}
}
if !iwant_ids.is_empty() {
debug!("IHAVE: Sending IWANT message");
Self::control_pool_add(
&mut self.control_pool,
peer_id.clone(),
GossipsubControlAction::IWant {
message_ids: iwant_ids.iter().cloned().collect(),
},
);
}
debug!("Completed IHAVE handling for peer: {:?}", peer_id);
}
fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
debug!("Handling IWANT for peer: {:?}", peer_id);
let mut cached_messages = HashMap::new();
for id in iwant_msgs {
if let Some(msg) = self.mcache.get(&id) {
cached_messages.insert(id.clone(), msg.clone());
}
}
if !cached_messages.is_empty() {
debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
let message_list = cached_messages.into_iter().map(|entry| entry.1).collect();
self.send_message(
peer_id.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
},
);
}
debug!("Completed IWANT handling for peer: {:?}", peer_id);
}
fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
debug!("Handling GRAFT message for peer: {:?}", peer_id);
let mut to_prune_topics = HashSet::new();
for topic_hash in topics {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
info!(
"GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
peer_id, topic_hash
);
peers.insert(peer_id.clone());
} else {
to_prune_topics.insert(topic_hash.clone());
}
}
if !to_prune_topics.is_empty() {
let prune_messages = to_prune_topics
.iter()
.map(|t| GossipsubControlAction::Prune {
topic_hash: t.clone(),
})
.collect();
info!(
"GRAFT: Not subscribed to topics - Sending PRUNE to peer: {:?}",
peer_id
);
self.send_message(
peer_id.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: prune_messages,
},
);
}
debug!("Completed GRAFT handling for peer: {:?}", peer_id);
}
fn handle_prune(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
debug!("Handling PRUNE message for peer: {}", peer_id.to_string());
for topic_hash in topics {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
if peers.remove(peer_id) {
info!(
"PRUNE: Removing peer: {} from the mesh for topic: {:?}",
peer_id.to_string(),
topic_hash
);
}
}
}
debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
}
fn handle_received_message(&mut self, mut msg: GossipsubMessage, propagation_source: &PeerId) {
let msg_id = (self.config.message_id_fn)(&msg);
debug!(
"Handling message: {:?} from peer: {}",
msg_id,
propagation_source.to_string()
);
if !self.config.validate_messages {
msg.validated = true;
}
if self.duplication_cache.insert(msg_id.clone(), ()).is_some() {
debug!("Message already received, ignoring. Message: {:?}", msg_id);
return;
}
self.mcache.put(msg.clone());
if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) {
debug!("Sending received message to user");
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Message(propagation_source.clone(), msg_id, msg.clone()),
));
}
if !self.config.validate_messages {
let message_id = (self.config.message_id_fn)(&msg);
self.forward_msg(msg, Some(propagation_source));
debug!("Completed message handling for message: {:?}", message_id);
}
}
fn handle_received_subscriptions(
&mut self,
subscriptions: &[GossipsubSubscription],
propagation_source: &PeerId,
) {
debug!(
"Handling subscriptions: {:?}, from source: {}",
subscriptions,
propagation_source.to_string()
);
let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
Some(topics) => topics,
None => {
error!(
"Subscription by unknown peer: {}",
propagation_source.to_string()
);
return;
}
};
let mut grafts = Vec::new();
let mut application_event = Vec::new();
for subscription in subscriptions {
let peer_list = self
.topic_peers
.entry(subscription.topic_hash.clone())
.or_insert_with(Default::default);
match subscription.action {
GossipsubSubscriptionAction::Subscribe => {
if peer_list.insert(propagation_source.clone()) {
debug!(
"SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
}
subscribed_topics.insert(subscription.topic_hash.clone());
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
if peers.len() < self.config.mesh_n_low {
if peers.insert(propagation_source.clone()) {
debug!(
"SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
debug!(
"Sending GRAFT to peer {} for topic {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
grafts.push(GossipsubControlAction::Graft {
topic_hash: subscription.topic_hash.clone(),
});
}
}
}
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Subscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic_hash.clone(),
},
));
}
GossipsubSubscriptionAction::Unsubscribe => {
if peer_list.remove(propagation_source) {
info!(
"SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
propagation_source.to_string(),
subscription.topic_hash
);
}
subscribed_topics.remove(&subscription.topic_hash);
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
peers.remove(propagation_source);
}
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed {
peer_id: propagation_source.clone(),
topic: subscription.topic_hash.clone(),
},
));
}
}
}
if !grafts.is_empty() {
self.send_message(
propagation_source.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: grafts,
},
);
}
for event in application_event {
self.events.push_back(event);
}
trace!(
"Completed handling subscriptions from source: {:?}",
propagation_source
);
}
fn heartbeat(&mut self) {
debug!("Starting heartbeat");
let mut to_graft = HashMap::new();
let mut to_prune = HashMap::new();
for (topic_hash, peers) in self.mesh.iter_mut() {
if peers.len() < self.config.mesh_n_low {
debug!(
"HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}",
topic_hash,
peers.len(),
self.config.mesh_n_low
);
let desired_peers = self.config.mesh_n - peers.len();
let peer_list =
Self::get_random_peers(&self.topic_peers, topic_hash, desired_peers, {
|peer| !peers.contains(peer)
});
for peer in &peer_list {
let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
}
debug!("Updating mesh, new mesh: {:?}", peer_list);
peers.extend(peer_list);
}
if peers.len() > self.config.mesh_n_high {
debug!(
"HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}",
topic_hash,
peers.len(),
self.config.mesh_n_high
);
let excess_peer_no = peers.len() - self.config.mesh_n;
let mut rng = thread_rng();
let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
shuffled.shuffle(&mut rng);
for _ in 0..excess_peer_no {
let peer = shuffled
.pop()
.expect("There should always be enough peers to remove");
peers.remove(&peer);
let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
}
}
}
{
let fanout = &mut self.fanout;
let fanout_ttl = self.config.fanout_ttl;
self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
if *last_pub_time + fanout_ttl < Instant::now() {
debug!(
"HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
topic_hash
);
fanout.remove(&topic_hash);
return false;
}
true
});
}
for (topic_hash, peers) in self.fanout.iter_mut() {
let mut to_remove_peers = Vec::new();
for peer in peers.iter() {
match self.peer_topics.get(peer) {
Some(topics) => {
if !topics.contains(&topic_hash) {
debug!(
"HEARTBEAT: Peer removed from fanout for topic: {:?}",
topic_hash
);
to_remove_peers.push(peer.clone());
}
}
None => {
to_remove_peers.push(peer.clone());
}
}
}
for to_remove in to_remove_peers {
peers.remove(&to_remove);
}
if peers.len() < self.config.mesh_n {
debug!(
"HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
peers.len(),
self.config.mesh_n
);
let needed_peers = self.config.mesh_n - peers.len();
let new_peers =
Self::get_random_peers(&self.topic_peers, topic_hash, needed_peers, |peer| {
!peers.contains(peer)
});
peers.extend(new_peers);
}
}
self.emit_gossip();
if !to_graft.is_empty() | !to_prune.is_empty() {
self.send_graft_prune(to_graft, to_prune);
}
self.flush_control_pool();
self.mcache.shift();
debug!("Completed Heartbeat");
}
fn emit_gossip(&mut self) {
for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
let message_ids = self.mcache.get_gossip_ids(&topic_hash);
if message_ids.is_empty() {
return;
}
let to_msg_peers = Self::get_random_peers(
&self.topic_peers,
&topic_hash,
self.config.gossip_lazy,
|peer| !peers.contains(peer),
);
debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());
for peer in to_msg_peers {
Self::control_pool_add(
&mut self.control_pool,
peer.clone(),
GossipsubControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: message_ids.clone(),
},
);
}
}
}
fn send_graft_prune(
&mut self,
to_graft: HashMap<PeerId, Vec<TopicHash>>,
mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
) {
for (peer, topics) in to_graft.iter() {
let mut control_msgs: Vec<GossipsubControlAction> = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Graft {
topic_hash: topic_hash.clone(),
})
.collect();
if let Some(topics) = to_prune.remove(peer) {
let mut prunes = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
})
.collect::<Vec<_>>();
control_msgs.append(&mut prunes);
}
self.send_message(
peer.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs,
},
);
}
for (peer, topics) in to_prune.iter() {
let remaining_prunes = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
})
.collect();
self.send_message(
peer.clone(),
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: remaining_prunes,
},
);
}
}
fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool {
let msg_id = (self.config.message_id_fn)(&message);
debug!("Forwarding message: {:?}", msg_id);
let mut recipient_peers = HashSet::new();
for topic in &message.topics {
if let Some(mesh_peers) = self.mesh.get(&topic) {
for peer_id in mesh_peers {
if Some(peer_id) != source {
recipient_peers.insert(peer_id.clone());
}
}
}
}
if !recipient_peers.is_empty() {
let event = Arc::new(GossipsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
control_msgs: Vec::new(),
});
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.send_message(peer.clone(), event.clone());
}
debug!("Completed forwarding message");
true
} else {
false
}
}
pub(crate) fn build_message(
&self,
topics: Vec<TopicHash>,
data: Vec<u8>,
) -> Result<GossipsubMessage, SigningError> {
match &self.publish_config {
PublishConfig::Signing {
ref keypair,
author,
inline_key,
} => {
let sequence_number: u64 = rand::random();
let signature = {
let message = rpc_proto::Message {
from: Some(author.clone().into_bytes()),
data: Some(data.clone()),
seqno: Some(sequence_number.to_be_bytes().to_vec()),
topic_ids: topics.clone().into_iter().map(|t| t.into()).collect(),
signature: None,
key: None,
};
let mut buf = Vec::with_capacity(message.encoded_len());
message
.encode(&mut buf)
.expect("Buffer has sufficient capacity");
let mut signature_bytes = SIGNING_PREFIX.to_vec();
signature_bytes.extend_from_slice(&buf);
Some(keypair.sign(&signature_bytes)?)
};
Ok(GossipsubMessage {
source: Some(author.clone()),
data,
sequence_number: Some(sequence_number),
topics,
signature,
key: inline_key.clone(),
validated: true,
})
}
PublishConfig::Author(peer_id) => {
Ok(GossipsubMessage {
source: Some(peer_id.clone()),
data,
sequence_number: Some(rand::random()),
topics,
signature: None,
key: None,
validated: true,
})
}
PublishConfig::RandomAuthor => {
Ok(GossipsubMessage {
source: Some(PeerId::random()),
data,
sequence_number: Some(rand::random()),
topics,
signature: None,
key: None,
validated: true,
})
}
PublishConfig::Anonymous => {
Ok(GossipsubMessage {
source: None,
data,
sequence_number: None,
topics,
signature: None,
key: None,
validated: true,
})
}
}
}
fn get_random_peers(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
topic_hash: &TopicHash,
n: usize,
mut f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
let mut gossip_peers = match topic_peers.get(topic_hash) {
Some(peer_list) => peer_list.iter().cloned().filter(|p| f(p)).collect(),
None => Vec::new(),
};
if gossip_peers.len() <= n {
debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
return gossip_peers.into_iter().collect();
}
let mut rng = thread_rng();
gossip_peers.partial_shuffle(&mut rng, n);
debug!("RANDOM PEERS: Got {:?} peers", n);
gossip_peers.into_iter().take(n).collect()
}
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
peer: PeerId,
control: GossipsubControlAction,
) {
control_pool
.entry(peer.clone())
.or_insert_with(Vec::new)
.push(control);
}
fn topic_hash(&self, topic: Topic) -> TopicHash {
if self.config.hash_topics {
topic.sha256_hash()
} else {
topic.no_hash()
}
}
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
self.send_message(
peer,
GossipsubRpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: controls,
},
);
}
}
fn send_message(&mut self, peer_id: PeerId, message: impl Into<Arc<GossipsubRpc>>) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: message.into(),
handler: NotifyHandler::Any,
})
}
}
impl NetworkBehaviour for Gossipsub {
type ProtocolsHandler = GossipsubHandler;
type OutEvent = GossipsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
GossipsubHandler::new(
self.config.protocol_id.clone(),
self.config.max_transmit_size,
self.config.validation_mode.clone(),
)
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, id: &PeerId) {
info!("New peer connected: {:?}", id);
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
});
}
if !subscriptions.is_empty() {
self.send_message(
id.clone(),
GossipsubRpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
},
);
}
self.peer_topics.insert(id.clone(), Default::default());
}
fn inject_disconnected(&mut self, id: &PeerId) {
debug!("Peer disconnected: {:?}", id);
{
let topics = match self.peer_topics.get(id) {
Some(topics) => (topics),
None => {
warn!("Disconnected node, not in connected nodes");
return;
}
};
for topic in topics {
if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
mesh_peers.remove(id);
}
if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
if !peer_list.remove(id) {
warn!("Disconnected node: {:?} not in topic_peers peer list", &id);
}
} else {
warn!(
"Disconnected node: {:?} with topic: {:?} not in topic_peers",
&id, &topic
);
}
self.fanout.get_mut(&topic).map(|peers| peers.remove(id));
}
}
let was_in = self.peer_topics.remove(id);
debug_assert!(was_in.is_some());
}
fn inject_event(&mut self, propagation_source: PeerId, _: ConnectionId, event: GossipsubRpc) {
if !event.subscriptions.is_empty() {
self.handle_received_subscriptions(&event.subscriptions, &propagation_source);
}
for message in event.messages {
self.handle_received_message(message, &propagation_source);
}
let mut ihave_msgs = vec![];
let mut graft_msgs = vec![];
let mut prune_msgs = vec![];
for control_msg in event.control_msgs {
match control_msg {
GossipsubControlAction::IHave {
topic_hash,
message_ids,
} => {
ihave_msgs.push((topic_hash, message_ids));
}
GossipsubControlAction::IWant { message_ids } => {
self.handle_iwant(&propagation_source, message_ids)
}
GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
GossipsubControlAction::Prune { topic_hash } => prune_msgs.push(topic_hash),
}
}
if !ihave_msgs.is_empty() {
self.handle_ihave(&propagation_source, ihave_msgs);
}
if !graft_msgs.is_empty() {
self.handle_graft(&propagation_source, graft_msgs);
}
if !prune_msgs.is_empty() {
self.handle_prune(&propagation_source, prune_msgs);
}
}
fn poll(
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(match event {
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: send_event,
} => {
let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler,
}
}
NetworkBehaviourAction::GenerateEvent(e) => {
NetworkBehaviourAction::GenerateEvent(e)
}
NetworkBehaviourAction::DialAddress { address } => {
NetworkBehaviourAction::DialAddress { address }
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
NetworkBehaviourAction::DialPeer { peer_id, condition }
}
NetworkBehaviourAction::ReportObservedAddr { address } => {
NetworkBehaviourAction::ReportObservedAddr { address }
}
});
}
while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
self.heartbeat();
}
Poll::Pending
}
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct GossipsubRpc {
pub messages: Vec<GossipsubMessage>,
pub subscriptions: Vec<GossipsubSubscription>,
pub control_msgs: Vec<GossipsubControlAction>,
}
impl fmt::Debug for GossipsubRpc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut b = f.debug_struct("GossipsubRpc");
if !self.messages.is_empty() {
b.field("messages", &self.messages);
}
if !self.subscriptions.is_empty() {
b.field("subscriptions", &self.subscriptions);
}
if !self.control_msgs.is_empty() {
b.field("control_msgs", &self.control_msgs);
}
b.finish()
}
}
#[derive(Clone, Debug)]
pub enum GossipsubEvent {
Message(PeerId, MessageId, GossipsubMessage),
Subscribed {
peer_id: PeerId,
topic: TopicHash,
},
Unsubscribed {
peer_id: PeerId,
topic: TopicHash,
},
}
fn validate_config(authenticity: &MessageAuthenticity, validation_mode: &ValidationMode) {
match validation_mode {
ValidationMode::Anonymous => {
if authenticity.is_signing() {
panic!("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
}
if !authenticity.is_anonymous() {
panic!("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
}
}
ValidationMode::Strict => {
if !authenticity.is_signing() {
panic!(
"Messages will be
published unsigned and incoming unsigned messages will be rejected. Consider adjusting
the validation or privacy settings in the config"
);
}
}
_ => {}
}
}
impl fmt::Debug for Gossipsub {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Gossipsub")
.field("config", &self.config)
.field("events", &self.events)
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
.field("peer_topics", &self.peer_topics)
.field("mesh", &self.mesh)
.field("fanout", &self.fanout)
.field("fanout_last_pub", &self.fanout_last_pub)
.field("mcache", &self.mcache)
.field("heartbeat", &self.heartbeat)
.finish()
}
}
impl fmt::Debug for PublishConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PublishConfig::Signing { author, .. } => f.write_fmt(format_args!("PublishConfig::Signing({})", author)),
PublishConfig::Author(author) => f.write_fmt(format_args!("PublishConfig::Author({})", author)),
PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
}
}
}