diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 8baa0ded9a..ac796a7074 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -5,7 +5,9 @@ use std::io::Cursor; use anyhow::{Context as _, Result, bail, ensure}; use base64::Engine as _; +use data_encoding::BASE32_NOPAD; use deltachat_contact_tools::sanitize_bidi_characters; +use iroh_gossip::proto::TopicId; use mail_builder::headers::HeaderType; use mail_builder::headers::address::{Address, EmailAddress}; use mail_builder::mime::MimePart; @@ -21,14 +23,14 @@ use crate::contact::{Contact, ContactId, Origin}; use crate::context::Context; use crate::e2ee::EncryptHelper; use crate::ephemeral::Timer as EphemeralTimer; -use crate::key::self_fingerprint; -use crate::key::{DcKey, SignedPublicKey}; +use crate::headerdef::HeaderDef; +use crate::key::{DcKey, SignedPublicKey, self_fingerprint}; use crate::location; use crate::log::{info, warn}; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::{SystemMessage, is_hidden}; use crate::param::Param; -use crate::peer_channels::create_iroh_header; +use crate::peer_channels::{create_iroh_header, get_iroh_topic_for_msg}; use crate::simplify::escape_message_footer_marks; use crate::stock_str; use crate::tools::{ @@ -139,6 +141,9 @@ pub struct MimeFactory { /// True if the avatar should be attached. pub attach_selfavatar: bool, + + /// This field is used to sustain the topic id of webxdcs needed for peer channels. + webxdc_topic: Option, } /// Result of rendering a message, ready to be submitted to a send job. @@ -449,7 +454,7 @@ impl MimeFactory { member_timestamps.is_empty() || to.len() + past_members.len() == member_timestamps.len() ); - + let webxdc_topic = get_iroh_topic_for_msg(context, msg.id).await?; let factory = MimeFactory { from_addr, from_displayname, @@ -469,6 +474,7 @@ impl MimeFactory { last_added_location_id: None, sync_ids_to_delete: None, attach_selfavatar, + webxdc_topic, }; Ok(factory) } @@ -516,6 +522,7 @@ impl MimeFactory { last_added_location_id: None, sync_ids_to_delete: None, attach_selfavatar: false, + webxdc_topic: None, }; Ok(res) @@ -1492,7 +1499,7 @@ impl MimeFactory { } SystemMessage::IrohNodeAddr => { headers.push(( - "Iroh-Node-Addr", + HeaderDef::IrohNodeAddr.into(), mail_builder::headers::text::Text::new(serde_json::to_string( &context .get_or_try_init_peer_channel() @@ -1673,10 +1680,13 @@ impl MimeFactory { let json = msg.param.get(Param::Arg).unwrap_or_default(); parts.push(context.build_status_update_part(json)); } else if msg.viewtype == Viewtype::Webxdc { + let topic = self + .webxdc_topic + .map(|top| BASE32_NOPAD.encode(top.as_bytes()).to_ascii_lowercase()) + .unwrap_or(create_iroh_header(context, msg.id).await?); headers.push(( - "Iroh-Gossip-Topic", - mail_builder::headers::raw::Raw::new(create_iroh_header(context, msg.id).await?) - .into(), + HeaderDef::IrohGossipTopic.into(), + mail_builder::headers::raw::Raw::new(topic).into(), )); if let (Some(json), _) = context .render_webxdc_status_update_object( diff --git a/src/peer_channels.rs b/src/peer_channels.rs index 21aaacf036..7941739517 100644 --- a/src/peer_channels.rs +++ b/src/peer_channels.rs @@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage; const PUBLIC_KEY_LENGTH: usize = 32; const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes(); -/// Store iroh peer channels for the context. +/// Store Iroh peer channels for the context. #[derive(Debug)] pub struct Iroh { - /// iroh router needed for iroh peer channels. + /// Iroh router needed for Iroh peer channels. pub(crate) router: iroh::protocol::Router, - /// [Gossip] needed for iroh peer channels. + /// [Gossip] needed for Iroh peer channels. pub(crate) gossip: Gossip, /// Sequence numbers for gossip channels. @@ -109,7 +109,7 @@ impl Iroh { info!( ctx, - "IROH_REALTIME: Joining gossip with peers: {:?}", node_ids, + "IROH_REALTIME: Joining gossip {topic} with peers: {:?}", node_ids, ); // Inform iroh of potentially new node addresses @@ -138,17 +138,11 @@ impl Iroh { Ok(Some(join_rx)) } - /// Add gossip peers to realtime channel if it is already active. - pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec) -> Result<()> { + /// Add gossip peer to realtime channel if it is already active. + pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { if self.iroh_channels.read().await.get(&topic).is_some() { - for peer in &peers { - self.router.endpoint().add_node_addr(peer.clone())?; - } - - self.gossip.subscribe_with_opts( - topic, - JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)), - ); + self.router.endpoint().add_node_addr(peer.clone())?; + self.gossip.subscribe(topic, vec![peer.node_id])?; } Ok(()) } @@ -198,7 +192,7 @@ impl Iroh { } /// Leave the realtime channel for a given topic. - pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> { + pub async fn leave_realtime(&self, topic: TopicId) -> Result<()> { if let Some(channel) = self.iroh_channels.write().await.remove(&topic) { // Dropping the last GossipTopic results in quitting the topic. // It is split into GossipReceiver and GossipSender. @@ -316,10 +310,21 @@ impl Context { } } } + + pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> { + if let Some(iroh) = &*self.iroh.read().await { + info!( + self, + "Adding (maybe existing) peer with id {} to {topic}", peer.node_id + ); + iroh.maybe_add_gossip_peer(topic, peer).await?; + } + Ok(()) + } } /// Cache a peers [NodeId] for one topic. -pub(crate) async fn iroh_add_peer_for_topic( +pub async fn iroh_add_peer_for_topic( ctx: &Context, msg_id: MsgId, topic: TopicId, @@ -336,6 +341,7 @@ pub(crate) async fn iroh_add_peer_for_topic( } /// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`. +/// This should not start iroh, because receiving a NodeAddr does not mean you want to participate. pub async fn add_gossip_peer_from_header( context: &Context, instance_id: MsgId, @@ -348,12 +354,13 @@ pub async fn add_gossip_peer_from_header( return Ok(()); } + let node_addr = + serde_json::from_str::(node_addr).context("Failed to parse node address")?; + info!( context, - "Adding iroh peer with address {node_addr:?} to the topic of {instance_id}." + "Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id ); - let node_addr = - serde_json::from_str::(node_addr).context("Failed to parse node address")?; context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived { msg_id: instance_id, @@ -371,13 +378,12 @@ pub async fn add_gossip_peer_from_header( let relay_server = node_addr.relay_url().map(|relay| relay.as_str()); iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?; - let iroh = context.get_or_try_init_peer_channel().await?; - iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?; + context.maybe_add_gossip_peer(topic, node_addr).await?; Ok(()) } /// Insert topicId into the database so that we can use it to retrieve the topic. -pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> { +pub async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> { ctx.sql .execute( "INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)", @@ -555,9 +561,9 @@ mod tests { use super::*; use crate::{ EventType, - chat::send_msg, + chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg}, message::{Message, Viewtype}, - test_utils::TestContextManager, + test_utils::{TestContext, TestContextManager}, }; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -924,8 +930,30 @@ mod tests { let alice = &mut tcm.alice().await; let bob = &mut tcm.bob().await; + let chat = alice.create_chat(bob).await.id; + + let mut instance = Message::new(Viewtype::File); + instance + .set_file_from_bytes( + alice, + "minimal.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + None, + ) + .unwrap(); + connect_alice_bob(alice, bob, chat, &mut instance).await + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_webxdc_resend() { + let mut tcm = TestContextManager::new(); + let alice = &mut tcm.alice().await; + let bob = &mut tcm.bob().await; + let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat") + .await + .unwrap(); + // Alice sends webxdc to bob - let alice_chat = alice.create_chat(bob).await; let mut instance = Message::new(Viewtype::File); instance .set_file_from_bytes( @@ -935,7 +963,59 @@ mod tests { None, ) .unwrap(); - send_msg(alice, alice_chat.id, &mut instance).await.unwrap(); + + add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await) + .await + .unwrap(); + + connect_alice_bob(alice, bob, group, &mut instance).await; + + // fiona joins late + let fiona = &mut tcm.fiona().await; + + add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await) + .await + .unwrap(); + + resend_msgs(alice, &[instance.id]).await.unwrap(); + let msg = alice.pop_sent_msg().await; + let fiona_instance = fiona.recv_msg(&msg).await; + fiona_instance.chat_id.accept(fiona).await.unwrap(); + + let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id) + .await + .unwrap() + .unwrap(); + let fiona_advert = fiona.pop_sent_msg().await; + alice.recv_msg_trash(&fiona_advert).await; + + fiona_connect_future.await.unwrap(); + send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into()) + .await + .unwrap(); + + loop { + let event = fiona.evtracker.recv().await.unwrap(); + if let EventType::WebxdcRealtimeData { data, .. } = event.typ { + if data == b"alice -> bob & fiona" { + break; + } else { + panic!( + "Unexpected status update: {}", + String::from_utf8_lossy(&data) + ); + } + } + } + } + + async fn connect_alice_bob( + alice: &mut TestContext, + bob: &mut TestContext, + chat: ChatId, + instance: &mut Message, + ) { + send_msg(alice, chat, instance).await.unwrap(); let alice_webxdc = alice.get_last_msg().await; let webxdc = alice.pop_sent_msg().await; @@ -952,8 +1032,9 @@ mod tests { .unwrap(); let alice_advertisement = alice.pop_sent_msg().await; - send_webxdc_realtime_advertisement(bob, bob_webxdc.id) + let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id) .await + .unwrap() .unwrap(); let bob_advertisement = bob.pop_sent_msg().await; @@ -961,8 +1042,9 @@ mod tests { bob.recv_msg_trash(&alice_advertisement).await; alice.recv_msg_trash(&bob_advertisement).await; - eprintln!("Alice waits for connection"); + eprintln!("Alice and Bob wait for connection"); alice_advertisement_future.await.unwrap(); + bob_advertisement_future.await.unwrap(); // Alice sends ephemeral message eprintln!("Sending ephemeral message"); diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 131167ad9f..f20cf2210f 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -2141,7 +2141,7 @@ RETURNING id created_db_entries.push(row_id); } - // check all parts whether they contain a new logging webxdc + // Maybe set logging xdc and add gossip topics for webxdcs. for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) { // check if any part contains a webxdc topic id if part.typ == Viewtype::Webxdc {