Skip to content

Commit 349b251

Browse files
committed
Fixed subscribe
1 parent d5a9c55 commit 349b251

File tree

1 file changed

+12
-33
lines changed

1 file changed

+12
-33
lines changed

network-libp2p/src/network.rs

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,9 @@ pub enum NetworkAction {
125125
value: Vec<u8>,
126126
output: oneshot::Sender<Result<(), NetworkError>>,
127127
},
128-
RegisterTopic {
129-
topic_hash: TopicHash,
130-
output: mpsc::Sender<(GossipsubMessage, Arc<Peer>)>,
131-
},
132128
Subscribe {
133129
topic_name: &'static str,
134-
output: oneshot::Sender<TopicHash>,
130+
output: mpsc::Sender<(GossipsubMessage, PeerId)>,
135131
},
136132
Publish {
137133
topic_name: &'static str,
@@ -144,8 +140,7 @@ pub enum NetworkAction {
144140
struct TaskState {
145141
dht_puts: HashMap<QueryId, oneshot::Sender<Result<(), NetworkError>>>,
146142
dht_gets: HashMap<QueryId, oneshot::Sender<Result<Vec<u8>, NetworkError>>>,
147-
gossip_sub: HashMap<TopicHash, oneshot::Sender<TopicHash>>,
148-
gossip_topics: HashMap<TopicHash, mpsc::Sender<(GossipsubMessage, Arc<Peer>)>>,
143+
gossip_topics: HashMap<TopicHash, mpsc::Sender<(GossipsubMessage, PeerId)>>,
149144
}
150145

151146
pub struct Network {
@@ -306,7 +301,7 @@ impl Network {
306301
for topic in msg.topics.iter() {
307302
if let Some(output) = state.gossip_topics.get(&topic) {
308303
// let peer = Self::get_peer(peer_id).unwrap();
309-
// output.send((msg, peer));
304+
output.send((msg, peer));
310305
} else {
311306
log::warn!("Unknown topic hash: {:?}", topic);
312307
}
@@ -365,13 +360,10 @@ impl Network {
365360
}
366361
}
367362
}
368-
NetworkAction::RegisterTopic { topic_hash, output } => {
369-
state.gossip_topics.insert(topic_hash, output);
370-
}
371363
NetworkAction::Subscribe { topic_name, output } => {
372364
let topic = GossipsubTopic::new(topic_name.into());
373365
if swarm.gossipsub.subscribe(topic.clone()) {
374-
state.gossip_sub.insert(topic.sha256_hash(), output);
366+
state.gossip_topics.insert(topic.sha256_hash(), output);
375367
} else {
376368
log::warn!("Already subscribed to topic: {:?}", topic_name);
377369
drop(output);
@@ -413,35 +405,22 @@ impl NetworkInterface for Network {
413405
where
414406
T: Topic + Sync,
415407
{
416-
let (output_tx, output_rx) = oneshot::channel();
408+
let (tx, rx) = mpsc::channel(16);
417409

418410
self.action_tx
419411
.lock()
420412
.await
421413
.send(NetworkAction::Subscribe {
422414
topic_name: topic.topic(),
423-
output: output_tx,
424-
})
425-
.await;
426-
427-
let topic_hash = output_rx.await.expect("Already subscribed to topic");
428-
let (tx, rx) = mpsc::channel(16);
429-
430-
self.action_tx
431-
.lock()
432-
.await
433-
.send(NetworkAction::RegisterTopic {
434-
topic_hash,
435415
output: tx,
436416
})
437-
.await;
438-
439-
let test = rx.map(|(msg, peer)|
440-
{
441-
let item = msg.data;
442-
( item , peer)
443-
}).into_inner();
444-
Box::new(test)
417+
.await
418+
.expect("Couldn't subscribe to pubsub topic");
419+
420+
Box::new(rx.map(|(msg, peer)| {
421+
let item: <T as Topic>::Item = Deserialize::deserialize_from_vec(&msg.data);
422+
(item, peer)
423+
}))
445424
}
446425

447426
async fn publish<T>(&self, topic: &T, item: <T as Topic>::Item) -> Result<(), Self::Error>

0 commit comments

Comments
 (0)