From 13dfbdbed69ab0f65ba381d303e3ae1ee4392164 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Wed, 2 Apr 2025 14:13:17 +0300 Subject: [PATCH] Fixes deadlock in Node startup. --- .../benchmarks/aeron/remote/ArchiveNode.java | 11 ++++++----- .../real_logic/benchmarks/aeron/remote/EchoNode.java | 6 +++--- .../benchmarks/aeron/remote/ReplayNode.java | 11 ++++++----- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ArchiveNode.java b/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ArchiveNode.java index 1384a033..e5536d73 100644 --- a/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ArchiveNode.java +++ b/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ArchiveNode.java @@ -48,6 +48,7 @@ public final class ArchiveNode implements AutoCloseable, Runnable private final boolean ownsArchiveClient; private final ExclusivePublication publication; private final Subscription subscription; + private final Aeron aeron; ArchiveNode(final AtomicBoolean running) { @@ -65,7 +66,7 @@ public final class ArchiveNode implements AutoCloseable, Runnable this.aeronArchive = aeronArchive; this.ownsArchiveClient = ownsArchiveClient; - final Aeron aeron = aeronArchive.context().aeron(); + this.aeron = aeronArchive.context().aeron(); subscription = aeron.addSubscription(destinationChannel(), destinationStreamId()); @@ -76,17 +77,17 @@ public final class ArchiveNode implements AutoCloseable, Runnable final int publicationSessionId = publication.sessionId(); final String channel = addSessionId(recordChannel, publicationSessionId); aeronArchive.startRecording(channel, recordStreamId, LOCAL, true); + } + public void run() + { awaitConnected( () -> subscription.isConnected() && publication.isConnected(), connectionTimeoutNs(), SystemNanoClock.INSTANCE); - awaitRecordingStart(aeron, publicationSessionId, aeronArchive.archiveId()); - } + awaitRecordingStart(aeron, publication.sessionId(), aeronArchive.archiveId()); - public void run() - { pipeMessages(subscription, publication, running); } diff --git a/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/EchoNode.java b/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/EchoNode.java index 6b646592..b8003595 100644 --- a/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/EchoNode.java +++ b/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/EchoNode.java @@ -89,15 +89,15 @@ public final class EchoNode implements AutoCloseable, Runnable .commit(); } }; + } + public void run() + { awaitConnected( () -> subscription.isConnected() && publication.isConnected() && publication.availableWindow() > 0, connectionTimeoutNs(), SystemNanoClock.INSTANCE); - } - public void run() - { final IdleStrategy idleStrategy = idleStrategy(); final AtomicBoolean running = this.running; diff --git a/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ReplayNode.java b/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ReplayNode.java index 584bac93..ce20f9c4 100644 --- a/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ReplayNode.java +++ b/benchmarks-aeron/src/main/java/uk/co/real_logic/benchmarks/aeron/remote/ReplayNode.java @@ -56,7 +56,8 @@ public final class ReplayNode implements AutoCloseable, Runnable private final MediaDriver mediaDriver; private final AeronArchive aeronArchive; private final boolean ownsArchiveClient; - private final Image image; + private final int sessionId; + private Image image; ReplayNode(final AtomicBoolean running) { @@ -89,7 +90,7 @@ public final class ReplayNode implements AutoCloseable, Runnable final String replayChannel = replayChannel(); final int replayStreamId = replayStreamId(); final long replaySessionId = replayFullRecording(aeronArchive, recordingId, replayChannel, replayStreamId); - final int sessionId = (int)replaySessionId; + this.sessionId = (int)replaySessionId; subscription = aeron.addSubscription(addSessionId(replayChannel, sessionId), replayStreamId); @@ -109,17 +110,17 @@ public final class ReplayNode implements AutoCloseable, Runnable .commit(); } }; + } + public void run() + { awaitConnected( () -> subscription.isConnected() && publication.isConnected(), connectionTimeoutNs(), SystemNanoClock.INSTANCE); image = subscription.imageBySessionId(sessionId); - } - public void run() - { final IdleStrategy idleStrategy = idleStrategy(); final AtomicBoolean running = this.running;