Skip to content

Commit 8013c5d

Browse files
authored
Remove serializable WF inputs feature (#701)
1 parent bcda5d9 commit 8013c5d

File tree

21 files changed

+15
-469
lines changed

21 files changed

+15
-469
lines changed

.cargo/config.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
[alias]
22
integ-test = ["run", "--package", "temporal-sdk-core", "--example", "integ_runner", "--"]
3-
wf-input-replay = ["run", "--package", "temporal-sdk-core", "--features", "save_wf_inputs",
4-
"--example", "wf_input_replay", "--"]
53
lint = ["clippy", "--workspace", "--examples", "--all-features",
6-
"--test", "integ_tests", "--test", "heavy_tests", "--", "--D", "warnings"]
4+
"--test", "integ_tests", "--test", "heavy_tests", "--", "--D", "warnings"]
75
test-lint = ["clippy", "--all", "--all-features", "--examples", "--workspace",
8-
"--tests", "--", "--D", "warnings"]
6+
"--tests", "--", "--D", "warnings"]

.github/workflows/heavy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ jobs:
3030
- uses: actions-rs/cargo@v1
3131
with:
3232
command: integ-test
33-
args: -c "--release" -c "--features=save_wf_inputs" -t heavy_tests -- --test-threads 1
33+
args: -c "--release" -t heavy_tests -- --test-threads 1

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ This repo is composed of multiple crates:
3434
- temporal-sdk-core `./core` - The Core implementation
3535
- temporal-sdk `./sdk` - A (currently prototype) Rust SDK built on top of Core. Used for testing.
3636
- rustfsm `./fsm` - Implements a procedural macro used by core for defining state machines
37-
(contains subcrates). It is temporal agnostic.
37+
(contains subcrates). It is temporal agnostic.
3838

3939
Visualized (dev dependencies are in blue):
4040

@@ -51,7 +51,7 @@ You can build and test the project using cargo:
5151
Run integ tests with `cargo integ-test`. By default it will start an ephemeral server. You can also
5252
use an already-running server by passing `-s external`.
5353

54-
Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`.
54+
Run load tests with `cargo test --test heavy_tests`.
5555

5656
## Formatting
5757

@@ -86,8 +86,8 @@ equivalent.
8686

8787
## Proto files
8888

89-
This repo uses a subtree for upstream protobuf files. The path `sdk-core-protos/protos/api_upstream` is a
90-
subtree. To update it, use:
89+
This repo uses a subtree for upstream protobuf files. The path `sdk-core-protos/protos/api_upstream`
90+
is a subtree. To update it, use:
9191

9292
`git pull --squash --rebase=false -s subtree ssh://git@github.com/temporalio/api.git master --allow-unrelated-histories`
9393

core-api/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ derive_builder = { workspace = true }
2121
derive_more = { workspace = true }
2222
opentelemetry = { workspace = true, optional = true }
2323
prost-types = { workspace = true }
24-
serde = { version = "1.0", default_features = false, features = ["derive"] }
2524
serde_json = "1.0"
2625
thiserror = "1.0"
27-
tokio = "1.24"
2826
tonic = { workspace = true }
2927
tracing-core = "0.1"
3028
url = "2.3"

core-api/src/worker.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::time::Duration;
2-
use tokio::sync::mpsc::UnboundedSender;
32

43
const MAX_OUTSTANDING_WFT_DEFAULT: usize = 100;
54
const MAX_CONCURRENT_WFT_POLLS_DEFAULT: usize = 5;
65

76
/// Defines per-worker configuration options
8-
#[derive(Debug, Clone, derive_builder::Builder, serde::Serialize, serde::Deserialize)]
7+
#[derive(Debug, Clone, derive_builder::Builder)]
98
#[builder(setter(into), build_fn(validate = "Self::validate"))]
109
#[non_exhaustive]
1110
pub struct WorkerConfig {
@@ -115,14 +114,6 @@ pub struct WorkerConfig {
115114
#[builder(default = "5")]
116115
pub fetching_concurrency: usize,
117116

118-
// TODO: Move this out - dependency on tokio should not exist just for this
119-
/// If set, and the `save_wf_inputs` feature is enabled in core, will be sent a serialized
120-
/// instance of every input to workflow state in order. This is for testing purposes, SDK
121-
/// implementations never need to care about it.
122-
#[builder(default)]
123-
#[serde(skip)]
124-
pub wf_state_inputs: Option<UnboundedSender<Vec<u8>>>,
125-
126117
/// If set, core will issue cancels for all outstanding activities after shutdown has been
127118
/// initiated and this amount of time has elapsed.
128119
#[builder(default)]

core/Cargo.toml

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ categories = ["development-tools"]
1414

1515
[features]
1616
default = ["otel"]
17-
# Do not enable this feature when building production SDKs. If we ever want a user in the field to
18-
# record WF input data, we can build them a custom SDK or they can build - it adds significant extra
19-
# code size in the form of [de]serializers.
20-
save_wf_inputs = ["rmp-serde", "temporal-sdk-core-protos/serde_serialize"]
2117
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp",
2218
"dep:opentelemetry-prometheus", "dep:hyper", "dep:hyper-util", "dep:http-body-util"]
2319
tokio-console = ["console-subscriber"]
@@ -59,7 +55,6 @@ prost-types = { version = "0.5", package = "prost-wkt-types" }
5955
rand = "0.8.3"
6056
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls", "tokio-rustls"], default-features = false, optional = true }
6157
ringbuf = "0.3"
62-
rmp-serde = { version = "1.1", optional = true }
6358
serde = "1.0"
6459
serde_json = "1.0"
6560
siphasher = "1.0"
@@ -115,7 +110,6 @@ test = false
115110
name = "heavy_tests"
116111
path = "../tests/heavy_tests.rs"
117112
test = false
118-
required-features = ["save_wf_inputs"]
119113

120114
[[bench]]
121115
name = "workflow_replay"
@@ -126,8 +120,3 @@ harness = false
126120
[[example]]
127121
name = "integ_runner"
128122
path = "../tests/runner.rs"
129-
130-
[[example]]
131-
name = "wf_input_replay"
132-
path = "../tests/wf_input_replay.rs"
133-
required-features = ["save_wf_inputs"]

core/src/abstractions.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -222,18 +222,6 @@ impl OwnedMeteredSemPermit {
222222

223223
#[derive(Debug)]
224224
pub(crate) struct UsedMeteredSemPermit(OwnedMeteredSemPermit);
225-
impl UsedMeteredSemPermit {
226-
#[cfg(feature = "save_wf_inputs")]
227-
pub(crate) fn fake_deserialized() -> Self {
228-
let sem = Arc::new(Semaphore::new(1));
229-
let inner = sem.try_acquire_owned().unwrap();
230-
Self(OwnedMeteredSemPermit {
231-
inner,
232-
unused_claimants: None,
233-
record_fn: Box::new(|_| {}),
234-
})
235-
}
236-
}
237225

238226
macro_rules! dbg_panic {
239227
($($arg:tt)*) => {

core/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ pub use temporal_sdk_core_api as api;
3838
pub use temporal_sdk_core_protos as protos;
3939
pub use temporal_sdk_core_protos::TaskToken;
4040
pub use url::Url;
41-
#[cfg(feature = "save_wf_inputs")]
42-
pub use worker::replay_wf_state_inputs;
4341
pub use worker::{Worker, WorkerConfig, WorkerConfigBuilder};
4442

4543
use crate::{

core/src/protosext/protocol_messages.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ use temporal_sdk_core_protos::temporal::api::{
88

99
/// A decoded & verified of a [Message] that came with a WFT.
1010
#[derive(Debug, Clone, PartialEq)]
11-
#[cfg_attr(
12-
feature = "save_wf_inputs",
13-
derive(serde::Serialize, serde::Deserialize)
14-
)]
1511
pub struct IncomingProtocolMessage {
1612
pub id: String,
1713
pub protocol_instance_id: String,
@@ -44,10 +40,6 @@ impl TryFrom<Message> for IncomingProtocolMessage {
4440
/// All the protocol [Message] bodies Core understands that might come to us when receiving a new
4541
/// WFT.
4642
#[derive(Debug, Clone, PartialEq)]
47-
#[cfg_attr(
48-
feature = "save_wf_inputs",
49-
derive(serde::Serialize, serde::Deserialize)
50-
)]
5143
pub enum IncomingProtocolMessageBody {
5244
UpdateRequest(UpdateRequest),
5345
}
@@ -71,10 +63,6 @@ impl TryFrom<Option<prost_types::Any>> for IncomingProtocolMessageBody {
7163
}
7264

7365
#[derive(Debug, Clone, PartialEq)]
74-
#[cfg_attr(
75-
feature = "save_wf_inputs",
76-
derive(serde::Serialize, serde::Deserialize)
77-
)]
7866
pub struct UpdateRequest {
7967
pub name: String,
8068
pub headers: HashMap<String, Payload>,

core/src/worker/activities/local_activities.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ pub(crate) struct LocalInFlightActInfo {
5555
}
5656

5757
#[derive(Debug, Clone)]
58-
#[cfg_attr(
59-
feature = "save_wf_inputs",
60-
derive(serde::Serialize, serde::Deserialize)
61-
)]
6258
pub(crate) enum LocalActivityExecutionResult {
6359
Completed(Success),
6460
Failed(ActFail),
@@ -104,10 +100,6 @@ impl LocalActivityExecutionResult {
104100
}
105101

106102
#[derive(Debug, Clone)]
107-
#[cfg_attr(
108-
feature = "save_wf_inputs",
109-
derive(serde::Serialize, serde::Deserialize)
110-
)]
111103
pub(crate) struct LocalActivityResolution {
112104
pub seq: u32,
113105
pub result: LocalActivityExecutionResult,

0 commit comments

Comments
 (0)