Skip to content

Commit 6273f38

Browse files
Merge pull request #49 from hyperledger/multiplex-operation-updates-fix
Multiplex operation updates fix
2 parents 01cd604 + 84b7a64 commit 6273f38

File tree

13 files changed

+83
-22
lines changed

13 files changed

+83
-22
lines changed

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

firefly-balius/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-balius"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "Helpers to write contracts for the FireFly Cardano connector"
55
license-file.workspace = true
66
publish = false

firefly-cardanoconnect/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-cardanoconnect"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "An implementation of the FireFly Connector API for Cardano"
55
license-file.workspace = true
66
publish = false

firefly-cardanoconnect/src/routes/ws.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use axum::{
1010
};
1111
use chrono::{DateTime, Utc};
1212
use serde::{Deserialize, Serialize};
13-
use tracing::{Level, error, instrument, warn};
13+
use tracing::{Level, error, info, instrument, warn};
1414

1515
use crate::{
1616
AppState,
@@ -30,6 +30,7 @@ async fn handle_socket(
3030
IncomingMessage::Listen(ListenMessage { topic }) => topic,
3131
other => bail!("unexpected first message: {:?}", other),
3232
};
33+
info!("caller listening on topic {topic}");
3334
let mut subscription = stream_manager.subscribe(&topic).await?;
3435

3536
while let Some(batch) = subscription.recv().await {
@@ -238,7 +239,9 @@ pub async fn handle_socket_upgrade(
238239
State(app_state): State<AppState>,
239240
ws: WebSocketUpgrade,
240241
) -> Response {
242+
info!("Incoming websocket connection");
241243
ws.on_upgrade(|socket| async move {
244+
info!("Websocket connection succeeded");
242245
if let Err(error) = handle_socket(app_state, socket).await {
243246
warn!("socket error: {:?}", error);
244247
}

firefly-cardanoconnect/src/streams/mux.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ impl StreamDispatcherWorker {
283283
let mut batch_sink = None;
284284
loop {
285285
let (batch, ack_rx, last_op_id, hwms) = select! {
286-
batch = self.build_batch(&mut operation_update_source), if batch_sink.is_some() && !self.listeners.is_empty() => batch,
286+
batch = self.build_batch(&mut operation_update_source), if batch_sink.is_some() => batch,
287287
Some(change) = state_change_source.recv() => {
288288
match change {
289289
StreamDispatcherStateChange::NewSettings(settings) => {
@@ -386,7 +386,7 @@ impl StreamDispatcherWorker {
386386
}
387387
while events.len() < self.batch_size {
388388
select! {
389-
_ = self.collect_contract_events(hwms, events) => {}
389+
_ = self.collect_contract_events(hwms, events), if !self.listeners.is_empty() => {}
390390
Ok(()) = operation_update_source.changed() => {
391391
self.collect_operation_events(last_op, events).await;
392392
}
@@ -542,13 +542,15 @@ mod tests {
542542
use crate::{
543543
blockchain::BlockchainClient,
544544
contracts::ContractManager,
545+
operations::{Operation, OperationId, OperationStatus},
545546
persistence,
546547
streams::{
547-
BlockReference, Listener, ListenerFilter, ListenerType, Stream, mux::Multiplexer,
548+
BatchEvent, BlockReference, Listener, ListenerFilter, ListenerType, Stream,
549+
mux::Multiplexer,
548550
},
549551
};
550552
use firefly_server::apitypes::ApiError;
551-
use tokio::sync::watch;
553+
use tokio::{sync::watch, time::timeout};
552554

553555
#[tokio::test]
554556
async fn should_ack_events() -> Result<(), ApiError> {
@@ -649,4 +651,59 @@ mod tests {
649651

650652
Ok(())
651653
}
654+
655+
#[tokio::test]
656+
async fn should_surface_operation_updates_without_listeners() -> Result<(), ApiError> {
657+
let blockchain = Arc::new(BlockchainClient::mock().await);
658+
let contracts = Arc::new(ContractManager::none());
659+
let persistence = persistence::init(&persistence::PersistenceConfig::Mock).await?;
660+
let operation_update_sink = watch::Sender::new(None);
661+
let mux = Multiplexer::new(
662+
blockchain.clone(),
663+
contracts,
664+
persistence.clone(),
665+
operation_update_sink.clone(),
666+
)
667+
.await?;
668+
669+
let stream = Stream {
670+
id: "stream_id".to_string().into(),
671+
name: "Some Stream".into(),
672+
batch_size: 5,
673+
batch_timeout: Duration::from_millis(100),
674+
};
675+
persistence.write_stream(&stream).await?;
676+
mux.handle_stream_write(&stream).await?;
677+
678+
let mut subscription = mux.subscribe("Some Stream").await?;
679+
680+
let operation = Operation {
681+
id: OperationId::from("op".to_string()),
682+
status: OperationStatus::Pending,
683+
tx_id: None,
684+
contract_address: None,
685+
};
686+
687+
let update_id = persistence.write_operation(&operation).await?;
688+
operation_update_sink.send_replace(Some(update_id.clone()));
689+
690+
// We receive a new batch with the operation update
691+
let batch = subscription.recv().await.unwrap();
692+
assert_eq!(batch.batch_number, 1);
693+
assert_eq!(batch.events, vec![BatchEvent::Receipt(operation)]);
694+
batch.ack();
695+
696+
// We don't receive any additional batches
697+
assert!(
698+
timeout(Duration::from_millis(500), subscription.recv())
699+
.await
700+
.is_err()
701+
);
702+
703+
// The checkpoint is updated
704+
let checkpoint = persistence.read_checkpoint(&stream.id).await?.unwrap();
705+
assert_eq!(checkpoint.last_operation_id, Some(update_id));
706+
707+
Ok(())
708+
}
652709
}

firefly-cardanosigner/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-cardanosigner"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "A service managing keys and signing for the FireFly Cardano connector"
55
license-file.workspace = true
66
publish = false

firefly-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-server"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "Internal library with shared code for services"
55
license-file.workspace = true
66
publish = false

scripts/demo/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-cardano-demo"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "A demo of the firefly-cardanoconnect API"
55
license-file.workspace = true
66
publish = false

scripts/deploy-contract/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-cardano-deploy-contract"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "A script to build and deploy a Balius smart contract to the FireFly Cardano connector"
55
license-file.workspace = true
66
publish = false

scripts/deploy/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "firefly-cardano-deploy"
3-
version = "0.6.0"
3+
version = "0.6.1"
44
description = "A script to build and deploy a Balius-backed API to FireFly"
55
license-file.workspace = true
66
publish = false

0 commit comments

Comments
 (0)