-
-
Notifications
You must be signed in to change notification settings - Fork 104
fix: realtime late join #6869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: realtime late join #6869
Changes from all commits
3618321
bb5c107
495481b
63ba7ef
44cb66c
e199f20
9e5a4d4
082ac3a
bad8676
0e01fa1
3d9486e
2d12aec
d08b828
a8da677
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage; | |||||
const PUBLIC_KEY_LENGTH: usize = 32; | ||||||
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes(); | ||||||
|
||||||
/// Store iroh peer channels for the context. | ||||||
/// Store Iroh peer channels for the context. | ||||||
#[derive(Debug)] | ||||||
pub struct Iroh { | ||||||
/// iroh router needed for iroh peer channels. | ||||||
/// Iroh router needed for Iroh peer channels. | ||||||
pub(crate) router: iroh::protocol::Router, | ||||||
|
||||||
/// [Gossip] needed for iroh peer channels. | ||||||
/// [Gossip] needed for Iroh peer channels. | ||||||
pub(crate) gossip: Gossip, | ||||||
|
||||||
/// Sequence numbers for gossip channels. | ||||||
|
@@ -109,7 +109,7 @@ impl Iroh { | |||||
|
||||||
info!( | ||||||
ctx, | ||||||
"IROH_REALTIME: Joining gossip with peers: {:?}", node_ids, | ||||||
"IROH_REALTIME: Joining gossip {topic} with peers: {:?}", node_ids, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
); | ||||||
|
||||||
// Inform iroh of potentially new node addresses | ||||||
|
@@ -138,17 +138,11 @@ impl Iroh { | |||||
Ok(Some(join_rx)) | ||||||
} | ||||||
|
||||||
/// Add gossip peers to realtime channel if it is already active. | ||||||
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> { | ||||||
/// Add gossip peer to realtime channel if it is already active. | ||||||
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { | ||||||
if self.iroh_channels.read().await.get(&topic).is_some() { | ||||||
for peer in &peers { | ||||||
self.router.endpoint().add_node_addr(peer.clone())?; | ||||||
} | ||||||
|
||||||
self.gossip.subscribe_with_opts( | ||||||
topic, | ||||||
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)), | ||||||
); | ||||||
self.router.endpoint().add_node_addr(peer.clone())?; | ||||||
self.gossip.subscribe(topic, vec![peer.node_id])?; | ||||||
} | ||||||
Ok(()) | ||||||
} | ||||||
|
@@ -198,7 +192,7 @@ impl Iroh { | |||||
} | ||||||
|
||||||
/// Leave the realtime channel for a given topic. | ||||||
pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> { | ||||||
pub async fn leave_realtime(&self, topic: TopicId) -> Result<()> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it needs to be public now? |
||||||
if let Some(channel) = self.iroh_channels.write().await.remove(&topic) { | ||||||
// Dropping the last GossipTopic results in quitting the topic. | ||||||
// It is split into GossipReceiver and GossipSender. | ||||||
|
@@ -316,10 +310,21 @@ impl Context { | |||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { | ||||||
if let Some(iroh) = &*self.iroh.read().await { | ||||||
info!( | ||||||
self, | ||||||
"Adding (maybe existing) peer with id {} to {topic}", peer.node_id | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
); | ||||||
iroh.maybe_add_gossip_peer(topic, peer).await?; | ||||||
} | ||||||
Ok(()) | ||||||
} | ||||||
} | ||||||
|
||||||
/// Cache a peers [NodeId] for one topic. | ||||||
pub(crate) async fn iroh_add_peer_for_topic( | ||||||
pub async fn iroh_add_peer_for_topic( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same question |
||||||
ctx: &Context, | ||||||
msg_id: MsgId, | ||||||
topic: TopicId, | ||||||
|
@@ -336,6 +341,7 @@ pub(crate) async fn iroh_add_peer_for_topic( | |||||
} | ||||||
|
||||||
/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`. | ||||||
/// This should not start iroh, because receiving a NodeAddr does not mean you want to participate. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Also, why is this public? |
||||||
pub async fn add_gossip_peer_from_header( | ||||||
context: &Context, | ||||||
instance_id: MsgId, | ||||||
|
@@ -348,12 +354,13 @@ pub async fn add_gossip_peer_from_header( | |||||
return Ok(()); | ||||||
} | ||||||
|
||||||
let node_addr = | ||||||
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?; | ||||||
|
||||||
info!( | ||||||
context, | ||||||
"Adding iroh peer with address {node_addr:?} to the topic of {instance_id}." | ||||||
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id | ||||||
); | ||||||
let node_addr = | ||||||
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?; | ||||||
|
||||||
context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived { | ||||||
msg_id: instance_id, | ||||||
|
@@ -371,13 +378,12 @@ pub async fn add_gossip_peer_from_header( | |||||
let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); | ||||||
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; | ||||||
|
||||||
let iroh = context.get_or_try_init_peer_channel().await?; | ||||||
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?; | ||||||
context.maybe_add_gossip_peer(topic, node_addr).await?; | ||||||
Ok(()) | ||||||
} | ||||||
|
||||||
/// Insert topicId into the database so that we can use it to retrieve the topic. | ||||||
pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> { | ||||||
pub async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same question |
||||||
ctx.sql | ||||||
.execute( | ||||||
"INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)", | ||||||
|
@@ -555,9 +561,9 @@ mod tests { | |||||
use super::*; | ||||||
use crate::{ | ||||||
EventType, | ||||||
chat::send_msg, | ||||||
chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg}, | ||||||
message::{Message, Viewtype}, | ||||||
test_utils::TestContextManager, | ||||||
test_utils::{TestContext, TestContextManager}, | ||||||
}; | ||||||
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] | ||||||
|
@@ -924,8 +930,30 @@ mod tests { | |||||
let alice = &mut tcm.alice().await; | ||||||
let bob = &mut tcm.bob().await; | ||||||
|
||||||
let chat = alice.create_chat(bob).await.id; | ||||||
|
||||||
let mut instance = Message::new(Viewtype::File); | ||||||
instance | ||||||
.set_file_from_bytes( | ||||||
alice, | ||||||
"minimal.xdc", | ||||||
include_bytes!("../test-data/webxdc/minimal.xdc"), | ||||||
None, | ||||||
) | ||||||
.unwrap(); | ||||||
connect_alice_bob(alice, bob, chat, &mut instance).await | ||||||
} | ||||||
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] | ||||||
async fn test_webxdc_resend() { | ||||||
let mut tcm = TestContextManager::new(); | ||||||
let alice = &mut tcm.alice().await; | ||||||
let bob = &mut tcm.bob().await; | ||||||
let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat") | ||||||
.await | ||||||
.unwrap(); | ||||||
|
||||||
// Alice sends webxdc to bob | ||||||
let alice_chat = alice.create_chat(bob).await; | ||||||
let mut instance = Message::new(Viewtype::File); | ||||||
instance | ||||||
.set_file_from_bytes( | ||||||
|
@@ -935,7 +963,59 @@ mod tests { | |||||
None, | ||||||
) | ||||||
.unwrap(); | ||||||
send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); | ||||||
|
||||||
add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await) | ||||||
.await | ||||||
.unwrap(); | ||||||
|
||||||
connect_alice_bob(alice, bob, group, &mut instance).await; | ||||||
|
||||||
// fiona joins late | ||||||
let fiona = &mut tcm.fiona().await; | ||||||
|
||||||
add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await) | ||||||
.await | ||||||
.unwrap(); | ||||||
|
||||||
resend_msgs(alice, &[instance.id]).await.unwrap(); | ||||||
let msg = alice.pop_sent_msg().await; | ||||||
let fiona_instance = fiona.recv_msg(&msg).await; | ||||||
fiona_instance.chat_id.accept(fiona).await.unwrap(); | ||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a check that peer channels aren't initialized, otherwise the test would pass w/o the fix |
||||||
let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id) | ||||||
.await | ||||||
.unwrap() | ||||||
.unwrap(); | ||||||
let fiona_advert = fiona.pop_sent_msg().await; | ||||||
alice.recv_msg_trash(&fiona_advert).await; | ||||||
|
||||||
fiona_connect_future.await.unwrap(); | ||||||
send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into()) | ||||||
.await | ||||||
.unwrap(); | ||||||
|
||||||
loop { | ||||||
let event = fiona.evtracker.recv().await.unwrap(); | ||||||
if let EventType::WebxdcRealtimeData { data, .. } = event.typ { | ||||||
if data == b"alice -> bob & fiona" { | ||||||
break; | ||||||
} else { | ||||||
panic!( | ||||||
"Unexpected status update: {}", | ||||||
String::from_utf8_lossy(&data) | ||||||
); | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
async fn connect_alice_bob( | ||||||
alice: &mut TestContext, | ||||||
bob: &mut TestContext, | ||||||
chat: ChatId, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be clear that it's Alice's ChatId, renaming it or reordering parameters would make sense |
||||||
instance: &mut Message, | ||||||
) { | ||||||
send_msg(alice, chat, instance).await.unwrap(); | ||||||
let alice_webxdc = alice.get_last_msg().await; | ||||||
|
||||||
let webxdc = alice.pop_sent_msg().await; | ||||||
|
@@ -952,17 +1032,19 @@ mod tests { | |||||
.unwrap(); | ||||||
let alice_advertisement = alice.pop_sent_msg().await; | ||||||
|
||||||
send_webxdc_realtime_advertisement(bob, bob_webxdc.id) | ||||||
let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id) | ||||||
.await | ||||||
.unwrap() | ||||||
.unwrap(); | ||||||
let bob_advertisement = bob.pop_sent_msg().await; | ||||||
|
||||||
eprintln!("Receiving advertisements"); | ||||||
bob.recv_msg_trash(&alice_advertisement).await; | ||||||
alice.recv_msg_trash(&bob_advertisement).await; | ||||||
|
||||||
eprintln!("Alice waits for connection"); | ||||||
eprintln!("Alice and Bob wait for connection"); | ||||||
alice_advertisement_future.await.unwrap(); | ||||||
bob_advertisement_future.await.unwrap(); | ||||||
|
||||||
// Alice sends ephemeral message | ||||||
eprintln!("Sending ephemeral message"); | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.get_headername()
would read more straightforwardly (and we use it in many other places). But probably we should just renameHeaderDef
toHeaderName
, the current naming looks unclear.