Skip to content

Commit b25b67c

Browse files
committed
Merge branch 'louis-vincent/cas-23-update-jet-to-latest-solana-crates-2ab-and-to-tonic-014a' of github.com:rpcpool/yellowstone-jet into louis-vincent/cas-23-update-jet-to-latest-solana-crates-2ab-and-to-tonic-014a
2 parents 57cd2ad + 285110c commit b25b67c

File tree

1 file changed

+51
-57
lines changed

1 file changed

+51
-57
lines changed

src/bin/jet.rs

Lines changed: 51 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,6 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
265265
)
266266
.await;
267267

268-
let local = tokio::task::LocalSet::new();
269-
270268
let shield_policy_store = if config
271269
.features
272270
.is_feature_enabled(yellowstone_jet::proto::jet::Feature::YellowstoneShield)
@@ -551,65 +549,61 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
551549

552550
tg_name_map.insert(ah.id(), "SIGINT".to_string());
553551

554-
local
555-
.run_until(async {
556-
let Some(result) = tg.join_next_with_id().await else {
557-
panic!("no task in the task group can ever happen");
558-
};
559-
macro_rules! get_id {
560-
($joinset_join_result_with_id:expr) => {
561-
match $joinset_join_result_with_id {
562-
Ok((id, _)) => *id,
563-
Err(e) => e.id().clone(),
564-
}
565-
};
552+
let Some(result) = tg.join_next_with_id().await else {
553+
panic!("no task in the task group can ever happen");
554+
};
555+
macro_rules! get_id {
556+
($joinset_join_result_with_id:expr) => {
557+
match $joinset_join_result_with_id {
558+
Ok((id, _)) => *id,
559+
Err(e) => e.id().clone(),
566560
}
567-
jet_cancellation_token.cancel();
568-
let task_id = get_id!(&result);
569-
let first = tg_name_map
570-
.remove(&task_id)
571-
.unwrap_or_else(|| format!("unknown task {task_id:?}"));
572-
warn!("shutting down, task {first} finished first with: {result:?}");
573-
rpc_admin.shutdown();
574-
rpc_solana_like.shutdown();
575-
576-
const SHUTDOWN_DURATION: std::time::Duration = std::time::Duration::from_secs(10);
577-
let shutdown_deadline = Instant::now() + SHUTDOWN_DURATION;
578-
loop {
579-
tokio::select! {
580-
Some(result) = tg.join_next_with_id() => {
581-
let task_id = get_id!(&result);
582-
let remaining_tasks = tg.len();
583-
let name = tg_name_map
584-
.remove(&task_id)
585-
.unwrap_or_else(|| format!("unknown task {task_id:?}"));
586-
if result.is_ok() {
587-
info!("task -- {name} : finished cleanly, {remaining_tasks} remaining");
588-
} else {
589-
warn!("task -- {name} : finished with error: {result:?}, {remaining_tasks} remaining");
590-
}
591-
if remaining_tasks == 0 {
592-
break;
593-
}
594-
}
595-
_ = tokio::time::sleep_until(shutdown_deadline) => {
596-
warn!("some tasks did not shut down in time, aborting them");
597-
break;
598-
}
599-
else => {
600-
break;
601-
}
561+
};
562+
}
563+
jet_cancellation_token.cancel();
564+
let task_id = get_id!(&result);
565+
let first = tg_name_map
566+
.remove(&task_id)
567+
.unwrap_or_else(|| format!("unknown task {task_id:?}"));
568+
warn!("shutting down, task {first} finished first with: {result:?}");
569+
rpc_admin.shutdown();
570+
rpc_solana_like.shutdown();
571+
572+
const SHUTDOWN_DURATION: std::time::Duration = std::time::Duration::from_secs(10);
573+
let shutdown_deadline = Instant::now() + SHUTDOWN_DURATION;
574+
loop {
575+
tokio::select! {
576+
Some(result) = tg.join_next_with_id() => {
577+
let task_id = get_id!(&result);
578+
let remaining_tasks = tg.len();
579+
let name = tg_name_map
580+
.remove(&task_id)
581+
.unwrap_or_else(|| format!("unknown task {task_id:?}"));
582+
if result.is_ok() {
583+
info!("task -- {name} : finished cleanly, {remaining_tasks} remaining");
584+
} else {
585+
warn!("task -- {name} : finished with error: {result:?}, {remaining_tasks} remaining");
602586
}
603-
}
604-
if !tg.is_empty() {
605-
for (_id, name) in tg_name_map.iter() {
606-
warn!("task -- {name} : did not finish in time, aborting");
587+
if remaining_tasks == 0 {
588+
break;
607589
}
608590
}
609-
tg.abort_all();
610-
Ok(())
611-
})
612-
.await
591+
_ = tokio::time::sleep_until(shutdown_deadline) => {
592+
warn!("some tasks did not shut down in time, aborting them");
593+
break;
594+
}
595+
else => {
596+
break;
597+
}
598+
}
599+
}
600+
if !tg.is_empty() {
601+
for (_id, name) in tg_name_map.iter() {
602+
warn!("task -- {name} : did not finish in time, aborting");
603+
}
604+
}
605+
tg.abort_all();
606+
Ok(())
613607
}
614608

615609
async fn spawn_push_prometheus_metrics(

0 commit comments

Comments
 (0)