Skip to content

Commit 3496205

Browse files
committed
unify hook interface
1 parent f965df6 commit 3496205

File tree

5 files changed

+145
-124
lines changed

5 files changed

+145
-124
lines changed

twmq/src/lib.rs

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod job;
44
pub mod queue;
55

66
use std::marker::PhantomData;
7+
use std::process::Output;
78
use std::sync::Arc;
89
use std::time::{Duration, SystemTime, UNIX_EPOCH};
910

@@ -23,6 +24,20 @@ use tokio::time::sleep;
2324
pub use redis;
2425
use tracing::Instrument;
2526

27+
pub struct SuccessHookData<'a, O> {
28+
pub result: &'a O,
29+
}
30+
31+
pub struct NackHookData<'a, E> {
32+
pub error: &'a E,
33+
pub delay: Option<Duration>,
34+
pub position: RequeuePosition,
35+
}
36+
37+
pub struct FailHookData<'a, E> {
38+
pub error: &'a E,
39+
}
40+
2641
// Main DurableExecution trait
2742
pub trait DurableExecution: Sized {
2843
type Output: Serialize + DeserializeOwned + Send + Sync;
@@ -32,33 +47,45 @@ pub trait DurableExecution: Sized {
3247
// Required method to process a job
3348
fn process(
3449
job: &Job<Self>,
50+
c: &Self::ExecutionContext,
3551
) -> impl Future<Output = JobResult<Self::Output, Self::ErrorData>> + Send + Sync;
3652

3753
fn on_success(
3854
&self,
39-
result: &Self::Output,
40-
tx: &mut TransactionContext<'_>,
41-
c: &Self::ExecutionContext,
42-
) -> impl Future<Output = ()> + Send + Sync;
55+
_job: &Job<Self>,
56+
_d: SuccessHookData<Self::Output>,
57+
_tx: &mut TransactionContext<'_>,
58+
_c: &Self::ExecutionContext,
59+
) -> impl Future<Output = ()> + Send + Sync {
60+
std::future::ready(())
61+
}
4362

4463
fn on_nack(
4564
&self,
46-
error: &Self::ErrorData,
47-
delay: Option<Duration>,
48-
position: RequeuePosition,
49-
tx: &mut TransactionContext<'_>,
50-
c: &Self::ExecutionContext,
51-
) -> impl Future<Output = ()> + Send + Sync;
65+
_job: &Job<Self>,
66+
_d: NackHookData<Self::ErrorData>,
67+
_tx: &mut TransactionContext<'_>,
68+
_c: &Self::ExecutionContext,
69+
) -> impl Future<Output = ()> + Send + Sync {
70+
std::future::ready(())
71+
}
5272

5373
fn on_fail(
5474
&self,
55-
error: &Self::ErrorData,
56-
tx: &mut TransactionContext<'_>,
57-
c: &Self::ExecutionContext,
58-
) -> impl Future<Output = ()> + Send + Sync;
75+
_job: &Job<Self>,
76+
_d: FailHookData<Self::ErrorData>,
77+
_tx: &mut TransactionContext<'_>,
78+
_c: &Self::ExecutionContext,
79+
) -> impl Future<Output = ()> + Send + Sync {
80+
std::future::ready(())
81+
}
5982

60-
fn on_timeout(&self, tx: &mut TransactionContext<'_>)
61-
-> impl Future<Output = ()> + Send + Sync;
83+
fn on_timeout(
84+
&self,
85+
_tx: &mut TransactionContext<'_>,
86+
) -> impl Future<Output = ()> + Send + Sync {
87+
std::future::ready(())
88+
}
6289
}
6390

6491
// Main Queue struct
@@ -358,7 +385,7 @@ where
358385

359386
tokio::spawn(async move {
360387
// Process job - note we don't pass a context here
361-
let result = DurableExecution::process(&job).await;
388+
let result = DurableExecution::process(&job, &execution_context).await;
362389

363390
// Mark job as complete (automatically happens in completion handlers)
364391

@@ -374,8 +401,12 @@ where
374401
queue_clone.name().to_string(),
375402
);
376403

404+
let success_hook_data = SuccessHookData {
405+
result: &output,
406+
};
407+
377408
// Call success hook to populate transaction context
378-
job.data.on_success(&output, &mut tx_context, &execution_context).await;
409+
job.data.on_success(&job, success_hook_data, &mut tx_context, &execution_context).await;
379410

380411
// Complete job with success and execute transaction
381412
if let Err(e) = queue_clone
@@ -408,9 +439,15 @@ where
408439
queue_clone.name().to_string(),
409440
);
410441

442+
let nack_hook_data = NackHookData {
443+
error: &error,
444+
delay,
445+
position,
446+
};
447+
411448
// Call nack hook to populate transaction context
412449
job.data
413-
.on_nack(&error, delay, position, &mut tx_context,&execution_context)
450+
.on_nack(&job, nack_hook_data, &mut tx_context,&execution_context)
414451
.await;
415452

416453
// Complete job with nack and execute transaction
@@ -442,8 +479,12 @@ where
442479
queue_clone.name.clone(),
443480
);
444481

482+
let fail_hook_data = FailHookData {
483+
error: &error
484+
};
485+
445486
// Call fail hook to populate transaction context
446-
job.data.on_fail(&error, &mut tx_context, &execution_context).await;
487+
job.data.on_fail(&job, fail_hook_data, &mut tx_context, &execution_context).await;
447488

448489
// Complete job with fail and execute transaction
449490
if let Err(e) = queue_clone

twmq/tests/basic_hook.rs

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ use std::{
1414

1515
use serde::{Deserialize, Serialize};
1616
use twmq::{
17-
DurableExecution, Queue,
17+
DurableExecution, Queue, SuccessHookData,
1818
hooks::TransactionContext,
19-
job::{Job, JobResult, JobStatus, RequeuePosition},
19+
job::{Job, JobResult, JobStatus},
2020
queue::QueueOptions,
2121
};
2222

@@ -78,7 +78,10 @@ impl DurableExecution for MainJobPayload {
7878
type ErrorData = TestJobErrorData;
7979
type ExecutionContext = Arc<WebhookQueue>;
8080

81-
async fn process(job: &Job<Self>) -> JobResult<Self::Output, Self::ErrorData> {
81+
async fn process(
82+
job: &Job<Self>,
83+
_: &Self::ExecutionContext,
84+
) -> JobResult<Self::Output, Self::ErrorData> {
8285
println!("MAIN_JOB: Processing job with id: {}", job.id);
8386
tokio::time::sleep(Duration::from_millis(50)).await;
8487

@@ -91,15 +94,16 @@ impl DurableExecution for MainJobPayload {
9194

9295
async fn on_success(
9396
&self,
94-
result: &Self::Output,
97+
_job: &Job<Self>,
98+
d: SuccessHookData<'_, Self::Output>,
9599
tx: &mut TransactionContext<'_>,
96100
ec: &Self::ExecutionContext,
97101
) {
98102
println!("MAIN_JOB: on_success hook - queuing webhook job");
99103

100104
let webhook_job = WebhookJobPayload {
101105
url: "https://api.example.com/webhook".to_string(),
102-
payload: serde_json::to_string(result).unwrap(),
106+
payload: serde_json::to_string(d.result).unwrap(),
103107
parent_job_id: self.id_to_check.clone(),
104108
};
105109

@@ -114,32 +118,17 @@ impl DurableExecution for MainJobPayload {
114118
tracing::info!("Successfully queued webhook job!");
115119
}
116120
}
117-
118-
async fn on_nack(
119-
&self,
120-
_error: &Self::ErrorData,
121-
_delay: Option<Duration>,
122-
_position: RequeuePosition,
123-
_tx: &mut TransactionContext<'_>,
124-
_: &Self::ExecutionContext,
125-
) {
126-
}
127-
async fn on_fail(
128-
&self,
129-
_error: &Self::ErrorData,
130-
_tx: &mut TransactionContext<'_>,
131-
_c: &Self::ExecutionContext,
132-
) {
133-
}
134-
async fn on_timeout(&self, _tx: &mut TransactionContext<'_>) {}
135121
}
136122

137123
impl DurableExecution for WebhookJobPayload {
138124
type Output = WebhookJobOutput;
139125
type ErrorData = WebhookJobErrorData;
140126
type ExecutionContext = ();
141127

142-
async fn process(job: &Job<Self>) -> JobResult<Self::Output, Self::ErrorData> {
128+
async fn process(
129+
job: &Job<Self>,
130+
_: &Self::ExecutionContext,
131+
) -> JobResult<Self::Output, Self::ErrorData> {
143132
println!("WEBHOOK_JOB: Sending webhook to: {}", job.data.url);
144133
println!("WEBHOOK_JOB: Payload: {}", job.data.payload);
145134
tokio::time::sleep(Duration::from_millis(25)).await;
@@ -155,32 +144,17 @@ impl DurableExecution for WebhookJobPayload {
155144

156145
async fn on_success(
157146
&self,
158-
_result: &Self::Output,
147+
_job: &Job<Self>,
148+
_d: SuccessHookData<'_, Self::Output>,
159149
_tx: &mut TransactionContext<'_>,
160-
_: &Self::ExecutionContext,
150+
_ec: &Self::ExecutionContext,
161151
) {
162-
println!(
152+
tracing::info!(
163153
"WEBHOOK_JOB: Webhook delivered successfully for parent: {}",
164154
self.parent_job_id
165155
);
166156
}
167157

168-
async fn on_nack(
169-
&self,
170-
_error: &Self::ErrorData,
171-
_delay: Option<Duration>,
172-
_position: RequeuePosition,
173-
_tx: &mut TransactionContext<'_>,
174-
_: &Self::ExecutionContext,
175-
) {
176-
}
177-
async fn on_fail(
178-
&self,
179-
_error: &Self::ErrorData,
180-
_tx: &mut TransactionContext<'_>,
181-
_c: &Self::ExecutionContext,
182-
) {
183-
}
184158
async fn on_timeout(&self, _tx: &mut TransactionContext<'_>) {}
185159
}
186160

twmq/tests/fixtures.rs

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use std::time::Duration;
66
use serde::{Deserialize, Serialize};
77

88
// Assuming your library's root is `twmq` or you use `crate::` if in lib.rs
9-
use twmq::DurableExecution;
109
use twmq::hooks::TransactionContext;
11-
use twmq::job::{Job, JobResult, RequeuePosition};
10+
use twmq::job::{Job, JobResult};
11+
use twmq::{DurableExecution, SuccessHookData};
1212

1313
// --- Test Job Definition ---
1414

@@ -47,7 +47,10 @@ impl DurableExecution for TestJobPayload {
4747

4848
// If not using async_trait, the signature is:
4949
// fn process(&self) -> impl std::future::Future<Output = JobResult<Self::Output, Self::ErrorData>> + Send + Sync {
50-
async fn process(job: &Job<Self>) -> JobResult<Self::Output, Self::ErrorData> {
50+
async fn process(
51+
job: &Job<Self>,
52+
_: &Self::ExecutionContext,
53+
) -> JobResult<Self::Output, Self::ErrorData> {
5154
println!(
5255
"TEST_JOB: Processing job with id_to_check: {}",
5356
job.data.id_to_check
@@ -60,40 +63,15 @@ impl DurableExecution for TestJobPayload {
6063
})
6164
}
6265

63-
async fn on_success(&self, _result: &Self::Output, _tx: &mut TransactionContext<'_>, _: &()) {
64-
println!(
65-
"TEST_JOB: on_success hook for id_to_check: {}",
66-
self.id_to_check
67-
);
68-
}
69-
70-
async fn on_nack(
66+
async fn on_success(
7167
&self,
72-
_error: &Self::ErrorData,
73-
_delay: Option<Duration>,
74-
_position: RequeuePosition,
68+
_job: &Job<Self>,
69+
_d: SuccessHookData<'_, Self::Output>,
7570
_tx: &mut TransactionContext<'_>,
76-
_: &(),
71+
_ec: &Self::ExecutionContext,
7772
) {
78-
// Not expected for this test
79-
println!(
80-
"TEST_JOB: on_nack hook for id_to_check: {}",
81-
self.id_to_check
82-
);
83-
}
84-
85-
async fn on_fail(&self, _error: &Self::ErrorData, _tx: &mut TransactionContext<'_>, _: &()) {
86-
// Not expected for this test
87-
println!(
88-
"TEST_JOB: on_fail hook for id_to_check: {}",
89-
self.id_to_check
90-
);
91-
}
92-
93-
async fn on_timeout(&self, _tx: &mut TransactionContext<'_>) {
94-
// Not expected for this test
95-
println!(
96-
"TEST_JOB: on_timeout hook for id_to_check: {}",
73+
tracing::info!(
74+
"TEST_JOB: on_success hook for id_to_check: {}",
9775
self.id_to_check
9876
);
9977
}

0 commit comments

Comments
 (0)