Skip to content

Commit 2632b70

Browse files
authored
Merge pull request #250 from NordSecurity/lipt/event_timestamps
Add timestamps to emmited events
2 parents fa37b9c + 3bcb6b5 commit 2632b70

File tree

13 files changed

+112
-73
lines changed

13 files changed

+112
-73
lines changed

changelog.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1-
### v6.0.0
2-
### **UNRELEASED**
1+
### v5.4.0
2+
### **Unforeseen Moose**
33
---
44
* Update moose tracker to v2.0.0
5+
* Add `moose_app_version` field to config
6+
* Add timestamps to emitted events
7+
* Add DLL signing to Windows builds
8+
9+
---
10+
<br>
511

612
### v5.3.0
713
### **Tungsten Bullet**

drop-transfer/examples/udrop.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
net::IpAddr,
66
path::{Path, PathBuf},
77
sync::Arc,
8-
time::Instant,
8+
time::{Instant, SystemTime},
99
};
1010

1111
use anyhow::Context;
@@ -148,14 +148,14 @@ fn print_event(ev: &Event) {
148148
async fn listen(
149149
service: &mut Service,
150150
storage: &Storage,
151-
rx: &mut mpsc::UnboundedReceiver<Event>,
151+
rx: &mut mpsc::UnboundedReceiver<(Event, SystemTime)>,
152152
out_dir: &Path,
153153
) -> anyhow::Result<()> {
154154
info!("Awaiting events…");
155155

156156
let mut active_file_downloads = BTreeMap::new();
157157
let mut storage = drop_transfer::StorageDispatch::new(storage);
158-
while let Some(ev) = rx.recv().await {
158+
while let Some((ev, _)) = rx.recv().await {
159159
storage.handle_event(&ev).await;
160160
print_event(&ev);
161161
match ev {
@@ -382,14 +382,18 @@ async fn main() -> anyhow::Result<()> {
382382
Ok(())
383383
}
384384

385-
async fn on_stop(service: Service, rx: &mut mpsc::UnboundedReceiver<Event>, storage: &Storage) {
385+
async fn on_stop(
386+
service: Service,
387+
rx: &mut mpsc::UnboundedReceiver<(Event, SystemTime)>,
388+
storage: &Storage,
389+
) {
386390
info!("Stopping the service");
387391

388392
service.stop().await;
389393
let mut storage = drop_transfer::StorageDispatch::new(storage);
390394

391395
// Drain events
392-
while let Some(ev) = rx.recv().await {
396+
while let Some((ev, _)) = rx.recv().await {
393397
storage.handle_event(&ev).await;
394398
print_event(&ev);
395399
}

drop-transfer/src/check.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,7 @@ async fn run(state: Arc<State>, xfer: Arc<IncomingTransfer>, logger: Logger) {
4949
Err(err) => {
5050
warn!(logger, "Failed to clear incoming transfer: {err:?}");
5151
}
52-
Ok(false) => {
53-
state
54-
.event_tx
55-
.send(crate::Event::IncomingTransferCanceled(xfer.clone(), true))
56-
.expect("Could not send a file cancelled event, channel closed");
57-
}
52+
Ok(false) => state.emit_event(crate::Event::IncomingTransferCanceled(xfer.clone(), true)),
5853
_ => (),
5954
}
6055
}

drop-transfer/src/service.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
net::IpAddr,
44
path::{Component, Path},
55
sync::Arc,
6-
time::Instant,
6+
time::{Instant, SystemTime},
77
};
88

99
use drop_analytics::{InitEventData, Moose, TransferStateEventData};
@@ -25,7 +25,7 @@ use crate::{
2525
};
2626

2727
pub(super) struct State {
28-
pub(super) event_tx: mpsc::UnboundedSender<Event>,
28+
pub(super) event_tx: mpsc::UnboundedSender<(Event, SystemTime)>,
2929
pub(super) transfer_manager: TransferManager,
3030
pub(crate) moose: Arc<dyn Moose>,
3131
pub(crate) auth: Arc<auth::Context>,
@@ -37,6 +37,14 @@ pub(super) struct State {
3737
pub fdresolv: Option<Arc<crate::file::FdResolver>>,
3838
}
3939

40+
impl State {
41+
pub fn emit_event(&self, event: crate::Event) {
42+
self.event_tx
43+
.send((event, SystemTime::now()))
44+
.expect("Failed to emit Event");
45+
}
46+
}
47+
4048
pub struct Service {
4149
pub(super) state: Arc<State>,
4250
stop: CancellationToken,
@@ -50,7 +58,7 @@ impl Service {
5058
pub async fn start(
5159
addr: IpAddr,
5260
storage: Arc<Storage>,
53-
event_tx: mpsc::UnboundedSender<Event>,
61+
event_tx: mpsc::UnboundedSender<(Event, SystemTime)>,
5462
logger: Logger,
5563
config: Arc<DropConfig>,
5664
moose: Arc<dyn Moose>,
@@ -136,17 +144,12 @@ impl Service {
136144
});
137145

138146
self.state
139-
.event_tx
140-
.send(Event::OutgoingTransferFailed(xfer.clone(), err, true))
141-
.expect("Event channel should be open");
147+
.emit_event(Event::OutgoingTransferFailed(xfer.clone(), err, true));
142148

143149
return;
144150
}
145151

146-
self.state
147-
.event_tx
148-
.send(Event::RequestQueued(xfer.clone()))
149-
.expect("Could not send a RequestQueued event, channel closed");
152+
self.state.emit_event(Event::RequestQueued(xfer.clone()));
150153

151154
ws::client::spawn(
152155
self.state.clone(),
@@ -253,9 +256,7 @@ impl Service {
253256
.await;
254257

255258
self.state
256-
.event_tx
257-
.send(crate::Event::OutgoingTransferCanceled(res.xfer, false))
258-
.expect("Event channel should be open");
259+
.emit_event(crate::Event::OutgoingTransferCanceled(res.xfer, false));
259260

260261
return Ok(());
261262
}
@@ -277,9 +278,7 @@ impl Service {
277278
.await;
278279

279280
self.state
280-
.event_tx
281-
.send(crate::Event::IncomingTransferCanceled(res.xfer, false))
282-
.expect("Event channel should be open");
281+
.emit_event(crate::Event::IncomingTransferCanceled(res.xfer, false));
283282

284283
return Ok(());
285284
}

drop-transfer/src/ws/client/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,7 @@ async fn connect_to_peer(
130130
protocol_version: 0,
131131
});
132132

133-
state
134-
.event_tx
135-
.send(Event::OutgoingTransferFailed(xfer.clone(), err, false))
136-
.expect("Failed to send TransferFailed event");
133+
state.emit_event(Event::OutgoingTransferFailed(xfer.clone(), err, false));
137134

138135
return ControlFlow::Break(());
139136
}

drop-transfer/src/ws/client/v2.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,10 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
282282

283283
if by_peer {
284284
self.state
285-
.event_tx
286-
.send(crate::Event::OutgoingTransferCanceled(
285+
.emit_event(crate::Event::OutgoingTransferCanceled(
287286
self.xfer.clone(),
288287
by_peer,
289-
))
290-
.expect("Could not send a transfer cancelled event, channel closed");
288+
));
291289
}
292290
}
293291

drop-transfer/src/ws/client/v4.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,12 +326,10 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
326326

327327
if by_peer {
328328
self.state
329-
.event_tx
330-
.send(crate::Event::OutgoingTransferCanceled(
329+
.emit_event(crate::Event::OutgoingTransferCanceled(
331330
self.xfer.clone(),
332331
by_peer,
333-
))
334-
.expect("Could not send a transfer cancelled event, channel closed");
332+
));
335333
}
336334
}
337335

drop-transfer/src/ws/client/v5.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -342,12 +342,10 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
342342

343343
if by_peer {
344344
self.state
345-
.event_tx
346-
.send(crate::Event::OutgoingTransferCanceled(
345+
.emit_event(crate::Event::OutgoingTransferCanceled(
347346
self.xfer.clone(),
348347
by_peer,
349-
))
350-
.expect("Could not send a transfer cancelled event, channel closed");
348+
));
351349
}
352350
}
353351

drop-transfer/src/ws/events.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
path::PathBuf,
33
sync::Arc,
4-
time::{Duration, Instant},
4+
time::{Duration, Instant, SystemTime},
55
};
66

77
use drop_analytics::{Moose, TransferFileEventData, MOOSE_STATUS_SUCCESS};
@@ -13,12 +13,20 @@ use crate::{
1313
};
1414

1515
struct FileEventTxInner {
16-
tx: UnboundedSender<Event>,
16+
tx: UnboundedSender<(Event, SystemTime)>,
1717
moose: Arc<dyn Moose>,
1818
started: Option<Instant>,
1919
transferred: u64,
2020
}
2121

22+
impl FileEventTxInner {
23+
fn emit_event(&self, event: Event) {
24+
self.tx
25+
.send((event, SystemTime::now()))
26+
.expect("Failed to emit File event");
27+
}
28+
}
29+
2230
pub type IncomingFileEventTx = FileEventTx<IncomingTransfer>;
2331
pub type OutgoingFileEventTx = FileEventTx<OutgoingTransfer>;
2432

@@ -29,12 +37,12 @@ pub struct FileEventTx<T: Transfer> {
2937
}
3038

3139
pub struct FileEventTxFactory {
32-
events: UnboundedSender<Event>,
40+
events: UnboundedSender<(Event, SystemTime)>,
3341
moose: Arc<dyn Moose>,
3442
}
3543

3644
impl FileEventTxFactory {
37-
pub fn new(events: UnboundedSender<Event>, moose: Arc<dyn Moose>) -> Self {
45+
pub fn new(events: UnboundedSender<(Event, SystemTime)>, moose: Arc<dyn Moose>) -> Self {
3846
Self { events, moose }
3947
}
4048

@@ -72,18 +80,14 @@ impl<T: Transfer> FileEventTx<T> {
7280
_ => {}
7381
}
7482

75-
lock.tx
76-
.send(event)
77-
.expect("Event channel shouldn't be closed");
83+
lock.emit_event(event);
7884
}
7985

8086
async fn start_inner(&self, event: Event) {
8187
let mut lock = self.inner.lock().await;
8288
lock.started = Some(Instant::now());
8389

84-
lock.tx
85-
.send(event)
86-
.expect("Event channel shouldn't be closed");
90+
lock.emit_event(event);
8791
}
8892

8993
async fn stop(&self, event: Event, status: Result<(), i32>) {
@@ -119,9 +123,7 @@ impl<T: Transfer> FileEventTx<T> {
119123
result,
120124
});
121125

122-
lock.tx
123-
.send(event)
124-
.expect("Event channel shouldn't be closed");
126+
lock.emit_event(event);
125127
}
126128

127129
async fn force_stop(&self, event: Event, status: Result<(), i32>) {
@@ -157,9 +159,7 @@ impl<T: Transfer> FileEventTx<T> {
157159
result,
158160
});
159161

160-
lock.tx
161-
.send(event)
162-
.expect("Event channel shouldn't be closed");
162+
lock.emit_event(event);
163163
}
164164

165165
pub async fn stop_silent(&self, status: Status) {

drop-transfer/src/ws/server/mod.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,7 @@ impl RunContext<'_> {
520520
.await?;
521521

522522
if is_new {
523-
self.state
524-
.event_tx
525-
.send(Event::RequestReceived(xfer.clone()))
526-
.expect("Failed to notify receiving peer!");
523+
self.state.emit_event(Event::RequestReceived(xfer.clone()));
527524

528525
check::spawn(
529526
self.state.clone(),
@@ -564,9 +561,7 @@ impl RunContext<'_> {
564561
}
565562
Ok(false) => {
566563
self.state
567-
.event_tx
568-
.send(crate::Event::IncomingTransferCanceled(xfer.clone(), true))
569-
.expect("Could not send a file cancelled event, channel closed");
564+
.emit_event(crate::Event::IncomingTransferCanceled(xfer.clone(), true));
570565
}
571566
_ => (),
572567
}

norddrop/src/device/mod.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
pub mod types;
2+
pub mod utils;
23

34
use std::{
45
net::{IpAddr, ToSocketAddrs},
56
sync::Arc,
7+
time::SystemTime,
68
};
79

810
use drop_analytics::DeveloperExceptionEventData;
911
use drop_auth::{PublicKey, SecretKey, PUBLIC_KEY_LENGTH};
1012
use drop_config::{Config, DropConfig, MooseConfig};
11-
use drop_transfer::{auth, utils::Hidden, FileToSend, OutgoingTransfer, Service, Transfer};
13+
use drop_transfer::{auth, utils::Hidden, Event, FileToSend, OutgoingTransfer, Service, Transfer};
1214
use slog::{debug, error, trace, warn, Logger};
1315
use tokio::sync::{mpsc, Mutex};
1416

@@ -120,15 +122,15 @@ impl NordDropFFI {
120122
let ed = self.event_dispatcher.clone();
121123
let event_logger = self.logger.clone();
122124
let event_storage = storage.clone();
123-
let (tx, mut rx) = mpsc::unbounded_channel();
125+
let (tx, mut rx) = mpsc::unbounded_channel::<(Event, SystemTime)>();
124126

125127
self.rt.spawn(async move {
126128
let mut dispatch = drop_transfer::StorageDispatch::new(&event_storage);
127129

128130
while let Some(e) = rx.recv().await {
129131
debug!(event_logger, "emitting event: {:#?}", e);
130132

131-
dispatch.handle_event(&e).await;
133+
dispatch.handle_event(&e.0).await;
132134
// Android team reported problems with the event ordering.
133135
// The events where dispatched in different order than where emitted.
134136
// To fix that we need to process the events sequentially.
@@ -383,6 +385,7 @@ impl NordDropFFI {
383385
file: file_id,
384386
status: From::from(&e),
385387
},
388+
timestamp: utils::current_timestamp(),
386389
});
387390
}
388391
});
@@ -415,6 +418,8 @@ impl NordDropFFI {
415418
data: FinishEvent::TransferFailed {
416419
status: From::from(&e),
417420
},
421+
422+
timestamp: utils::current_timestamp(),
418423
})
419424
}
420425
});
@@ -451,6 +456,7 @@ impl NordDropFFI {
451456
file,
452457
status: From::from(&err),
453458
},
459+
timestamp: utils::current_timestamp(),
454460
});
455461
}
456462
});
@@ -607,6 +613,7 @@ fn open_database(
607613
// Inform app that we wiped the old DB file
608614
events.dispatch(types::Event::RuntimeError {
609615
status: drop_core::Status::DbLost,
616+
timestamp: utils::current_timestamp(),
610617
});
611618
};
612619

0 commit comments

Comments
 (0)