use std::{
collections::{HashSet, HashMap},
sync::Arc,
};
use fnv::{FnvHashSet, FnvHashMap};
use sp_core::storage::{StorageKey, StorageData};
use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
use prometheus_endpoint::{Registry, CounterVec, Opts, U64, register};
#[derive(Debug)]
pub struct StorageChangeSet {
changes: Arc<Vec<(StorageKey, Option<StorageData>)>>,
child_changes: Arc<Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>>,
filter: Option<HashSet<StorageKey>>,
child_filters: Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
}
impl StorageChangeSet {
pub fn iter<'a>(&'a self)
-> impl Iterator<Item=(Option<&'a StorageKey>, &'a StorageKey, Option<&'a StorageData>)> + 'a {
let top = self.changes
.iter()
.filter(move |&(key, _)| match self.filter {
Some(ref filter) => filter.contains(key),
None => true,
})
.map(move |(k,v)| (None, k, v.as_ref()));
let children = self.child_changes
.iter()
.filter_map(move |(sk, changes)| {
if let Some(cf) = self.child_filters.as_ref() {
if let Some(filter) = cf.get(sk) {
Some(changes
.iter()
.filter(move |&(key, _)| match filter {
Some(ref filter) => filter.contains(key),
None => true,
})
.map(move |(k,v)| (Some(sk), k, v.as_ref())))
} else { None }
} else { None }
})
.flatten();
top.chain(children)
}
}
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;
type SubscriberId = u64;
type SubscribersGauge = CounterVec<U64>;
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
metrics: Option<SubscribersGauge>,
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
listeners: HashMap<StorageKey, FnvHashSet<SubscriberId>>,
child_listeners: HashMap<StorageKey, (
HashMap<StorageKey, FnvHashSet<SubscriberId>>,
FnvHashSet<SubscriberId>
)>,
sinks: FnvHashMap<SubscriberId, (
TracingUnboundedSender<(Block::Hash, StorageChangeSet)>,
Option<HashSet<StorageKey>>,
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
)>,
}
impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
Self {
metrics: Default::default(),
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
child_listeners: Default::default(),
sinks: Default::default(),
}
}
}
impl<Block: BlockT> StorageNotifications<Block> {
pub fn new(prometheus_registry: Option<Registry>) -> Self {
let metrics = prometheus_registry.and_then(|r|
CounterVec::new(
Opts::new(
"storage_notification_subscribers",
"Number of subscribers in storage notification sytem"
),
&["action"],
).and_then(|g| register(g, &r))
.ok()
);
StorageNotifications {
metrics,
next_id: Default::default(),
wildcard_listeners: Default::default(),
listeners: Default::default(),
child_listeners: Default::default(),
sinks: Default::default(),
}
}
pub fn trigger(
&mut self,
hash: &Block::Hash,
changeset: impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item=(Vec<u8>, impl Iterator<Item=(Vec<u8>, Option<Vec<u8>>)>)
>,
) {
let has_wildcard = !self.wildcard_listeners.is_empty();
if !has_wildcard && self.listeners.is_empty() && self.child_listeners.is_empty() {
return;
}
let mut subscribers = self.wildcard_listeners.clone();
let mut changes = Vec::new();
let mut child_changes = Vec::new();
for (k, v) in changeset {
let k = StorageKey(k);
let listeners = self.listeners.get(&k);
if let Some(ref listeners) = listeners {
subscribers.extend(listeners.iter());
}
if has_wildcard || listeners.is_some() {
changes.push((k, v.map(StorageData)));
}
}
for (sk, changeset) in child_changeset {
let sk = StorageKey(sk);
if let Some((cl, cw)) = self.child_listeners.get(&sk) {
let mut changes = Vec::new();
for (k, v) in changeset {
let k = StorageKey(k);
let listeners = cl.get(&k);
if let Some(ref listeners) = listeners {
subscribers.extend(listeners.iter());
}
subscribers.extend(cw.iter());
if !cw.is_empty() || listeners.is_some() {
changes.push((k, v.map(StorageData)));
}
}
if !changes.is_empty() {
child_changes.push((sk, changes));
}
}
}
if changes.is_empty() && child_changes.is_empty() {
return;
}
let changes = Arc::new(changes);
let child_changes = Arc::new(child_changes);
let to_remove = self.sinks
.iter()
.filter_map(|(subscriber, &(ref sink, ref filter, ref child_filters))| {
let should_remove = {
if subscribers.contains(subscriber) {
sink.unbounded_send((hash.clone(), StorageChangeSet {
changes: changes.clone(),
child_changes: child_changes.clone(),
filter: filter.clone(),
child_filters: child_filters.clone(),
})).is_err()
} else {
sink.is_closed()
}
};
if should_remove {
Some(subscriber.clone())
} else {
None
}
}).collect::<Vec<_>>();
for sub_id in to_remove {
self.remove_subscriber(sub_id);
}
}
fn remove_subscriber_from(
subscriber: &SubscriberId,
filters: &Option<HashSet<StorageKey>>,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
){
match filters {
None => {
wildcards.remove(subscriber);
},
Some(filters) => {
for key in filters.iter() {
let remove_key = match listeners.get_mut(key) {
Some(ref mut set) => {
set.remove(subscriber);
set.is_empty()
},
None => false,
};
if remove_key {
listeners.remove(key);
}
}
}
}
}
fn remove_subscriber(&mut self, subscriber: SubscriberId) {
if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut self.listeners,
&mut self.wildcard_listeners,
);
if let Some(child_filters) = child_filters.as_ref() {
for (c_key, filters) in child_filters {
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut *listeners,
&mut *wildcards,
);
if listeners.is_empty() && wildcards.is_empty() {
self.child_listeners.remove(&c_key);
}
}
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}
}
}
fn listen_from(
current_id: SubscriberId,
filter_keys: &Option<impl AsRef<[StorageKey]>>,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
) -> Option<HashSet<StorageKey>>
{
match filter_keys {
None => {
wildcards.insert(current_id);
None
},
Some(keys) => Some(keys.as_ref().iter().map(|key| {
listeners
.entry(key.clone())
.or_insert_with(Default::default)
.insert(current_id);
key.clone()
}).collect())
}
}
pub fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
self.next_id += 1;
let current_id = self.next_id;
let keys = Self::listen_from(
current_id,
&filter_keys,
&mut self.listeners,
&mut self.wildcard_listeners,
);
let child_keys = filter_child_keys.map(|filter_child_keys| {
filter_child_keys.iter().map(|(c_key, o_keys)| {
let (c_listeners, c_wildcards) = self.child_listeners
.entry(c_key.clone())
.or_insert_with(Default::default);
(c_key.clone(), Self::listen_from(
current_id,
o_keys,
&mut *c_listeners,
&mut *c_wildcards,
))
}).collect()
});
let (tx, rx) = tracing_unbounded("mpsc_storage_notification_items");
self.sinks.insert(current_id, (tx, keys, child_keys));
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"added"]).inc();
}
rx
}
}
#[cfg(test)]
mod tests {
use sp_runtime::testing::{H256 as Hash, Block as RawBlock, ExtrinsicWrapper};
use super::*;
use std::iter::{empty, Empty};
type TestChangeSet = (
Vec<(StorageKey, Option<StorageData>)>,
Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>,
);
#[cfg(test)]
impl From<TestChangeSet> for StorageChangeSet {
fn from(changes: TestChangeSet) -> Self {
let child_filters = Some([
(StorageKey(vec![4]), None),
(StorageKey(vec![5]), None),
].iter().cloned().collect());
StorageChangeSet {
changes: Arc::new(changes.0),
child_changes: Arc::new(changes.1),
filter: None,
child_filters,
}
}
}
#[cfg(test)]
impl PartialEq for StorageChangeSet {
fn eq(&self, other: &Self) -> bool {
self.iter().eq(other.iter())
}
}
type Block = RawBlock<ExtrinsicWrapper<Hash>>;
#[test]
fn triggering_change_should_notify_wildcard_listeners() {
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), None)];
let mut recv = futures::executor::block_on_stream(
notifications.listen(None, Some(&child_filter[..]))
);
let changeset = vec![
(vec![2], Some(vec![3])),
(vec![3], None),
];
let c_changeset_1 = vec![
(vec![5], Some(vec![4])),
(vec![6], None),
];
let c_changeset = vec![(vec![4], c_changeset_1)];
notifications.trigger(
&Hash::from_low_u64_be(1),
changeset.into_iter(),
c_changeset.into_iter().map(|(a,b)| (a, b.into_iter())),
);
assert_eq!(recv.next().unwrap(), (Hash::from_low_u64_be(1), (vec![
(StorageKey(vec![2]), Some(StorageData(vec![3]))),
(StorageKey(vec![3]), None),
], vec![(StorageKey(vec![4]), vec![
(StorageKey(vec![5]), Some(StorageData(vec![4]))),
(StorageKey(vec![6]), None),
])]).into()));
}
#[test]
fn should_only_notify_interested_listeners() {
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let mut recv1 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let mut recv2 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let mut recv3 = futures::executor::block_on_stream(
notifications.listen(Some(&[]), Some(&child_filter))
);
let changeset = vec![
(vec![2], Some(vec![3])),
(vec![1], None),
];
let c_changeset_1 = vec![
(vec![5], Some(vec![4])),
(vec![6], None),
];
let c_changeset = vec![(vec![4], c_changeset_1)];
notifications.trigger(
&Hash::from_low_u64_be(1),
changeset.into_iter(),
c_changeset.into_iter().map(|(a,b)| (a, b.into_iter())),
);
assert_eq!(recv1.next().unwrap(), (Hash::from_low_u64_be(1), (vec![
(StorageKey(vec![1]), None),
], vec![]).into()));
assert_eq!(recv2.next().unwrap(), (Hash::from_low_u64_be(1), (vec![
(StorageKey(vec![2]), Some(StorageData(vec![3]))),
], vec![]).into()));
assert_eq!(recv3.next().unwrap(), (Hash::from_low_u64_be(1), (vec![],
vec![
(StorageKey(vec![4]), vec![(StorageKey(vec![5]), Some(StorageData(vec![4])))]),
]).into()));
}
#[test]
fn should_cleanup_subscribers_if_dropped() {
let mut notifications = StorageNotifications::<Block>::default();
{
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let _recv1 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let _recv2 = futures::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let _recv3 = futures::executor::block_on_stream(
notifications.listen(None, None)
);
let _recv4 = futures::executor::block_on_stream(
notifications.listen(None, Some(&child_filter))
);
assert_eq!(notifications.listeners.len(), 2);
assert_eq!(notifications.wildcard_listeners.len(), 2);
assert_eq!(notifications.child_listeners.len(), 1);
}
let changeset = vec![
(vec![2], Some(vec![3])),
(vec![1], None),
];
let c_changeset = empty::<(_, Empty<_>)>();
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);
assert_eq!(notifications.listeners.len(), 0);
assert_eq!(notifications.wildcard_listeners.len(), 0);
assert_eq!(notifications.child_listeners.len(), 0);
}
#[test]
fn should_not_send_empty_notifications() {
let mut recv = {
let mut notifications = StorageNotifications::<Block>::default();
let recv = futures::executor::block_on_stream(notifications.listen(None, None));
let changeset = vec![];
let c_changeset = empty::<(_, Empty<_>)>();
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);
recv
};
assert_eq!(recv.next(), None);
}
}