Skip to content

Commit b1d6a2c

Browse files
committed
sdk: allow types to abstract over encoding
1 parent 409e74e commit b1d6a2c

21 files changed

+277
-194
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ tonic-build = "0.11"
1515
opentelemetry = "0.22"
1616
prost = "0.12"
1717
prost-types = "0.12"
18+
serde_json = "1.0"

core/src/core_tests/local_activities.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use temporal_sdk_core_protos::{
3434
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
3535
workflow_commands::{ActivityCancellationType, ScheduleLocalActivity},
3636
workflow_completion::WorkflowActivationCompletion,
37-
ActivityTaskCompletion, AsJsonPayloadExt,
37+
ActivityTaskCompletion, ToPayload,
3838
},
3939
temporal::api::{
4040
common::v1::RetryPolicy,
@@ -91,7 +91,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
9191
|ctx: WfContext| async move {
9292
let la = ctx.local_activity(LocalActivityOptions {
9393
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
94-
input: "hi".as_json_payload().expect("serializes fine"),
94+
input: "hi".to_payload().expect("serializes fine"),
9595
..Default::default()
9696
});
9797
ctx.timer(Duration::from_secs(1)).await;
@@ -117,9 +117,7 @@ pub async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> {
117117
.map(|i| {
118118
ctx.local_activity(LocalActivityOptions {
119119
activity_type: "echo".to_string(),
120-
input: format!("Hi {i}")
121-
.as_json_payload()
122-
.expect("serializes fine"),
120+
input: format!("Hi {i}").to_payload().expect("serializes fine"),
123121
..Default::default()
124122
})
125123
})
@@ -198,7 +196,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
198196
|ctx: WfContext| async move {
199197
ctx.local_activity(LocalActivityOptions {
200198
activity_type: "echo".to_string(),
201-
input: "hi".as_json_payload().expect("serializes fine"),
199+
input: "hi".to_payload().expect("serializes fine"),
202200
..Default::default()
203201
})
204202
.await;
@@ -254,7 +252,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
254252
let la_res = ctx
255253
.local_activity(LocalActivityOptions {
256254
activity_type: "echo".to_string(),
257-
input: "hi".as_json_payload().expect("serializes fine"),
255+
input: "hi".to_payload().expect("serializes fine"),
258256
retry_policy: RetryPolicy {
259257
initial_interval: Some(prost_dur!(from_millis(50))),
260258
backoff_coefficient: 1.2,
@@ -335,7 +333,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
335333
let la_res = ctx
336334
.local_activity(LocalActivityOptions {
337335
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
338-
input: "hi".as_json_payload().expect("serializes fine"),
336+
input: "hi".to_payload().expect("serializes fine"),
339337
retry_policy: RetryPolicy {
340338
initial_interval: Some(prost_dur!(from_millis(65))),
341339
// This will make the second backoff 65 seconds, plenty to use timer
@@ -389,7 +387,7 @@ async fn local_act_null_result() {
389387
|ctx: WfContext| async move {
390388
ctx.local_activity(LocalActivityOptions {
391389
activity_type: "nullres".to_string(),
392-
input: "hi".as_json_payload().expect("serializes fine"),
390+
input: "hi".to_payload().expect("serializes fine"),
393391
..Default::default()
394392
})
395393
.await;
@@ -432,7 +430,7 @@ async fn local_act_command_immediately_follows_la_marker() {
432430
|ctx: WfContext| async move {
433431
ctx.local_activity(LocalActivityOptions {
434432
activity_type: "nullres".to_string(),
435-
input: "hi".as_json_payload().expect("serializes fine"),
433+
input: "hi".to_payload().expect("serializes fine"),
436434
..Default::default()
437435
})
438436
.await;
@@ -736,7 +734,7 @@ async fn test_schedule_to_start_timeout() {
736734
let la_res = ctx
737735
.local_activity(LocalActivityOptions {
738736
activity_type: "echo".to_string(),
739-
input: "hi".as_json_payload().expect("serializes fine"),
737+
input: "hi".to_payload().expect("serializes fine"),
740738
// Impossibly small timeout so we timeout in the queue
741739
schedule_to_start_timeout: prost_dur!(from_nanos(1)),
742740
..Default::default()
@@ -824,7 +822,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time(
824822
let la_res = ctx
825823
.local_activity(LocalActivityOptions {
826824
activity_type: "echo".to_string(),
827-
input: "hi".as_json_payload().expect("serializes fine"),
825+
input: "hi".to_payload().expect("serializes fine"),
828826
retry_policy: RetryPolicy {
829827
initial_interval: Some(prost_dur!(from_millis(50))),
830828
backoff_coefficient: 1.2,
@@ -897,7 +895,7 @@ async fn start_to_close_timeout_allows_retries(#[values(true, false)] la_complet
897895
let la_res = ctx
898896
.local_activity(LocalActivityOptions {
899897
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
900-
input: "hi".as_json_payload().expect("serializes fine"),
898+
input: "hi".to_payload().expect("serializes fine"),
901899
retry_policy: RetryPolicy {
902900
initial_interval: Some(prost_dur!(from_millis(20))),
903901
backoff_coefficient: 1.0,
@@ -971,7 +969,7 @@ async fn wft_failure_cancels_running_las() {
971969
|ctx: WfContext| async move {
972970
let la_handle = ctx.local_activity(LocalActivityOptions {
973971
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
974-
input: "hi".as_json_payload().expect("serializes fine"),
972+
input: "hi".to_payload().expect("serializes fine"),
975973
..Default::default()
976974
});
977975
tokio::join!(
@@ -1038,7 +1036,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
10381036
WorkflowFunction::new::<_, _, ()>(|ctx: WfContext| async move {
10391037
ctx.local_activity(LocalActivityOptions {
10401038
activity_type: "echo".to_string(),
1041-
input: "hi".as_json_payload().expect("serializes fine"),
1039+
input: "hi".to_payload().expect("serializes fine"),
10421040
..Default::default()
10431041
})
10441042
.await;
@@ -1092,7 +1090,7 @@ async fn local_act_records_nonfirst_attempts_ok() {
10921090
|ctx: WfContext| async move {
10931091
ctx.local_activity(LocalActivityOptions {
10941092
activity_type: "echo".to_string(),
1095-
input: "hi".as_json_payload().expect("serializes fine"),
1093+
input: "hi".to_payload().expect("serializes fine"),
10961094
retry_policy: RetryPolicy {
10971095
initial_interval: Some(prost_dur!(from_millis(10))),
10981096
backoff_coefficient: 1.0,

core/src/worker/workflow/machines/local_activity_state_machine.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -893,7 +893,7 @@ mod tests {
893893
use temporal_sdk_core_protos::{
894894
coresdk::{
895895
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
896-
AsJsonPayloadExt,
896+
ToPayload,
897897
},
898898
temporal::api::{
899899
common::v1::RetryPolicy, enums::v1::WorkflowTaskFailedCause, failure::v1::Failure,
@@ -906,7 +906,7 @@ mod tests {
906906
async fn la_wf(ctx: WfContext) -> WorkflowResult<()> {
907907
ctx.local_activity(LocalActivityOptions {
908908
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
909-
input: ().as_json_payload().unwrap(),
909+
input: ().to_payload().unwrap(),
910910
retry_policy: RetryPolicy {
911911
maximum_attempts: 1,
912912
..Default::default()
@@ -1003,13 +1003,13 @@ mod tests {
10031003
async fn two_la_wf(ctx: WfContext) -> WorkflowResult<()> {
10041004
ctx.local_activity(LocalActivityOptions {
10051005
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1006-
input: ().as_json_payload().unwrap(),
1006+
input: ().to_payload().unwrap(),
10071007
..Default::default()
10081008
})
10091009
.await;
10101010
ctx.local_activity(LocalActivityOptions {
10111011
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1012-
input: ().as_json_payload().unwrap(),
1012+
input: ().to_payload().unwrap(),
10131013
..Default::default()
10141014
})
10151015
.await;
@@ -1020,12 +1020,12 @@ mod tests {
10201020
tokio::join!(
10211021
ctx.local_activity(LocalActivityOptions {
10221022
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1023-
input: ().as_json_payload().unwrap(),
1023+
input: ().to_payload().unwrap(),
10241024
..Default::default()
10251025
}),
10261026
ctx.local_activity(LocalActivityOptions {
10271027
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1028-
input: ().as_json_payload().unwrap(),
1028+
input: ().to_payload().unwrap(),
10291029
..Default::default()
10301030
})
10311031
);
@@ -1131,14 +1131,14 @@ mod tests {
11311131
async fn la_timer_la(ctx: WfContext) -> WorkflowResult<()> {
11321132
ctx.local_activity(LocalActivityOptions {
11331133
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1134-
input: ().as_json_payload().unwrap(),
1134+
input: ().to_payload().unwrap(),
11351135
..Default::default()
11361136
})
11371137
.await;
11381138
ctx.timer(Duration::from_secs(5)).await;
11391139
ctx.local_activity(LocalActivityOptions {
11401140
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
1141-
input: ().as_json_payload().unwrap(),
1141+
input: ().to_payload().unwrap(),
11421142
..Default::default()
11431143
})
11441144
.await;
@@ -1426,7 +1426,7 @@ mod tests {
14261426
worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move {
14271427
let la = ctx.local_activity(LocalActivityOptions {
14281428
cancel_type,
1429-
input: ().as_json_payload().unwrap(),
1429+
input: ().to_payload().unwrap(),
14301430
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
14311431
..Default::default()
14321432
});

core/src/worker/workflow/machines/patch_state_machine.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::{
3939
};
4040
use temporal_sdk_core_protos::{
4141
constants::PATCH_MARKER_NAME,
42-
coresdk::{common::build_has_change_marker_details, AsJsonPayloadExt},
42+
coresdk::{common::build_has_change_marker_details, ToPayload},
4343
temporal::api::{
4444
command::v1::{
4545
Command, RecordMarkerCommandAttributes, UpsertWorkflowSearchAttributesCommandAttributes,
@@ -135,7 +135,7 @@ pub(super) fn has_change<'a>(
135135
let mut all_ids = BTreeSet::from_iter(existing_patch_ids);
136136
all_ids.insert(machine.shared_state.patch_id.as_str());
137137
let serialized = all_ids
138-
.as_json_payload()
138+
.to_payload()
139139
.context("Could not serialize search attribute value for patch machine")
140140
.map_err(|e| WFMachinesError::Fatal(e.to_string()))?;
141141

@@ -296,7 +296,7 @@ mod tests {
296296
coresdk::{
297297
common::decode_change_marker_details,
298298
workflow_activation::{workflow_activation_job, NotifyHasPatch, WorkflowActivationJob},
299-
AsJsonPayloadExt, FromJsonPayloadExt,
299+
FromPayload, ToPayload,
300300
},
301301
temporal::api::{
302302
command::v1::{
@@ -606,7 +606,7 @@ mod tests {
606606
{ search_attributes: Some(attrs) }
607607
)
608608
if attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap()
609-
== &[MY_PATCH_ID].as_json_payload().unwrap()
609+
== &[MY_PATCH_ID].to_payload().unwrap()
610610
);
611611
}
612612
// The only time the "old" timer should fire is in v2, replaying, without a marker.
@@ -790,7 +790,7 @@ mod tests {
790790
);
791791
let expected_patches: HashSet<String, _> =
792792
(1..i).map(|i| format!("patch-{i}")).collect();
793-
let deserialized = HashSet::<String, RandomState>::from_json_payload(
793+
let deserialized = HashSet::<String, RandomState>::from_payload(
794794
attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap(),
795795
)
796796
.unwrap();

core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ mod tests {
211211
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
212212
workflow_commands::SetPatchMarker,
213213
workflow_completion::WorkflowActivationCompletion,
214-
AsJsonPayloadExt,
214+
ToPayload,
215215
},
216216
temporal::api::{
217217
command::v1::command::Attributes, common::v1::Payload,
@@ -370,7 +370,7 @@ mod tests {
370370
let mut ver_upsert = HashMap::new();
371371
ver_upsert.insert(
372372
VERSION_SEARCH_ATTR_KEY.to_string(),
373-
"hi".as_json_payload().unwrap(),
373+
"hi".to_payload().unwrap(),
374374
);
375375
let act = core.poll_workflow_activation().await.unwrap();
376376
let mut cmds = if with_patched_cmd {

sdk-core-protos/src/history_builder.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
},
88
external_data::LocalActivityMarkerData,
99
workflow_commands::ScheduleActivity,
10-
AsJsonPayloadExt, IntoPayloadsExt,
10+
IntoPayloadsExt, Json, ToPayload,
1111
},
1212
temporal::api::{
1313
common::v1::{
@@ -17,8 +17,7 @@ use crate::{
1717
failure::v1::{failure, CanceledFailureInfo, Failure},
1818
history::v1::{history_event::Attributes, *},
1919
taskqueue::v1::TaskQueue,
20-
update,
21-
update::v1::outcome,
20+
update::{self, v1::outcome},
2221
},
2322
HistoryInfo,
2423
};
@@ -429,7 +428,7 @@ impl TestHistoryBuilder {
429428
let mut indexed_fields = HashMap::new();
430429
indexed_fields.insert(
431430
"TemporalChangeVersion".to_string(),
432-
attribs.as_json_payload().unwrap(),
431+
attribs.to_payload().unwrap(),
433432
);
434433
let attrs = UpsertWorkflowSearchAttributesEventAttributes {
435434
workflow_task_completed_event_id: self.previous_task_completed_id,
@@ -605,6 +604,10 @@ impl TestHistoryBuilder {
605604
}
606605
}
607606

607+
impl ToPayload for &[String] {
608+
type Encoder = Json;
609+
}
610+
608611
fn default_attribs(et: EventType) -> Result<Attributes> {
609612
Ok(match et {
610613
EventType::WorkflowExecutionStarted => default_wes_attribs().into(),

0 commit comments

Comments
 (0)