Skip to content

Commit ae85e4c

Browse files
authored
Fix new clippy lints & integ test recompiles (#923)
1 parent 8ad1154 commit ae85e4c

File tree

16 files changed

+154
-129
lines changed

16 files changed

+154
-129
lines changed

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
CLI_VERSION_OVERRIDE = "v1.3.1-priority.0"
55

66
[alias]
7-
integ-test = ["run", "--package", "temporal-sdk-core", "--example", "integ_runner", "--"]
7+
integ-test = ["test", "--features", "temporal-sdk-core-protos/serde_serialize", "--package", "temporal-sdk-core", "--test", "integ_runner", "--"]
88
lint = ["clippy", "--workspace", "--examples", "--all-features",
99
"--test", "integ_tests", "--test", "heavy_tests", "--test", "manual_tests",
1010
"--", "--D", "warnings"]

.github/workflows/per-pr.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
submodules: recursive
2222
- uses: dtolnay/rust-toolchain@stable
2323
with:
24-
toolchain: 1.85.0
24+
toolchain: 1.87.0
2525
- name: Install protoc
2626
uses: arduino/setup-protoc@v3
2727
with:
@@ -56,7 +56,7 @@ jobs:
5656
- uses: actions/checkout@v4
5757
- uses: dtolnay/rust-toolchain@stable
5858
with:
59-
toolchain: 1.85.0
59+
toolchain: 1.87.0
6060
- name: Install protoc
6161
uses: arduino/setup-protoc@v3
6262
with:
@@ -95,7 +95,7 @@ jobs:
9595
- uses: actions/checkout@v4
9696
- uses: dtolnay/rust-toolchain@stable
9797
with:
98-
toolchain: 1.85.0
98+
toolchain: 1.87.0
9999
- name: Install protoc
100100
uses: arduino/setup-protoc@v3
101101
with:
@@ -120,7 +120,7 @@ jobs:
120120
- uses: actions/checkout@v4
121121
- uses: dtolnay/rust-toolchain@stable
122122
with:
123-
toolchain: 1.85.0
123+
toolchain: 1.87.0
124124
- name: Install protoc
125125
uses: arduino/setup-protoc@v3
126126
with:
@@ -145,7 +145,7 @@ jobs:
145145
- uses: actions/checkout@v4
146146
- uses: dtolnay/rust-toolchain@stable
147147
with:
148-
toolchain: 1.85.0
148+
toolchain: 1.87.0
149149
- name: Install protoc
150150
uses: arduino/setup-protoc@v3
151151
with:

AGENTS.md

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Contributor Guidance for `sdk-core`
22

3-
This repository provides a Rust workspace for the Temporal Core SDK and related crates. Use this document as your quick reference when submitting pull requests.
3+
This repository provides a Rust workspace for the Temporal Core SDK and related crates. Use this
4+
document as your quick reference when submitting pull requests.
45

56
## Where Things Are
67

@@ -18,12 +19,13 @@ This repository provides a Rust workspace for the Temporal Core SDK and related
1819
## Repo Specific Utilities
1920

2021
- `.cargo/config.toml` defines useful cargo aliases:
21-
- `cargo lint` – run clippy on workspace crates
22-
- `cargo test-lint` – run clippy on tests
23-
- `cargo integ-test` – run the integration test runner
22+
- `cargo lint` – run clippy on workspace crates
23+
- `cargo test-lint` – run clippy on tests
24+
- `cargo integ-test` – run the integration test runner
2425
- `cargo-tokio-console.sh` – run any cargo command with the `tokio-console` feature
2526
- `integ-with-otel.sh` – run integration tests with OpenTelemetry enabled
26-
- `.cargo/multi-worker-manual-test` – helper script for spawning multiple workers during manual testing
27+
- `.cargo/multi-worker-manual-test` – helper script for spawning multiple workers during manual
28+
testing
2729

2830
## Building and Testing
2931

@@ -33,7 +35,7 @@ The following commands are enforced for each pull request (see `README.md`):
3335
cargo build # build all crates
3436
cargo test # run unit tests
3537
cargo integ-test # integration tests (starts ephemeral server by default)
36-
cargo test --test heavy_tests # load tests
38+
cargo test --test heavy_tests # load tests -- agents do not need to run this and should not
3739
```
3840

3941
Additional checks:
@@ -48,7 +50,8 @@ Documentation can be generated with `cargo doc`.
4850
## Expectations for Pull Requests
4951

5052
- Format and lint your code before submitting.
51-
- Ensure all tests pass locally. Integration tests may require a running Temporal server or the ephemeral server started by `cargo integ-test`.
53+
- Ensure all tests pass locally. Integration tests may require a running Temporal server or the
54+
ephemeral server started by `cargo integ-test`.
5255
- Keep commit messages short and in the imperative mood.
5356
- Provide a clear PR description outlining what changed and why.
5457
- Reviewers expect new features or fixes to include corresponding tests when applicable.
@@ -64,5 +67,7 @@ Reviewers will look for:
6467

6568
## Notes
6669

67-
- Fetch workflow histories with `cargo run --bin histfetch <workflow_id> [run_id]` (binary lives in `test-utils`).
68-
- Protobuf files under `sdk-core-protos/protos/api_upstream` are a git subtree; see `README.md` for update instructions.
70+
- Fetch workflow histories with `cargo run --bin histfetch <workflow_id> [run_id]` (binary lives in
71+
`test-utils`).
72+
- Protobuf files under `sdk-core-protos/protos/api_upstream` are a git subtree; see `README.md` for
73+
update instructions.

core/Cargo.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,16 @@ test = false
129129
name = "workflow_replay"
130130
harness = false
131131

132-
# This is maybe a bit hacky, but we call the runner an "example" because that gets it compiling with
133-
# the dev-dependencies, which we want.
134-
[[example]]
132+
# The integration test runner should compile with the same configuration as the
133+
# rest of the integration tests so that artifacts are shared and no additional
134+
# compilation is required when switching between using the runner and running
135+
# tests directly.
136+
[[test]]
135137
name = "integ_runner"
136138
path = "../tests/runner.rs"
139+
harness = false
140+
test = false
141+
required-features = ["temporal-sdk-core-protos/serde_serialize"]
137142

138143
[lints]
139144
workspace = true

core/src/ephemeral_server/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ async fn download_and_extract(
546546
// We have to map the error type to an io error
547547
let stream = resp
548548
.bytes_stream()
549-
.map(|item| item.map_err(|err| io::Error::new(io::ErrorKind::Other, err)));
549+
.map(|item| item.map_err(io::Error::other));
550550

551551
// Since our tar/zip impls use sync IO, we have to create a bridge and run
552552
// in a blocking closure.

core/src/internal_flags.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub(crate) enum CoreInternalFlags {
4040
TooHigh = u32::MAX,
4141
}
4242

43+
#[allow(clippy::large_enum_variant)]
4344
#[derive(Debug, Clone, PartialEq, Eq)]
4445
pub(crate) enum InternalFlags {
4546
Enabled {

core/src/protosext/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ pub(crate) struct CompleteLocalActivityData {
260260
pub(crate) result: Result<Payload, Failure>,
261261
}
262262

263+
#[allow(clippy::result_large_err)]
263264
pub(crate) fn validate_activity_completion(
264265
status: &activity_execution_result::Status,
265266
) -> Result<(), CompleteActivityError> {

core/src/worker/activities.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ pub(crate) struct WorkerActivityTasks {
175175
#[derive(derive_more::From)]
176176
enum ActivityTaskSource {
177177
PendingCancel(PendingActivityCancel),
178-
PendingStart(Result<(PermittedTqResp<PollActivityTaskQueueResponse>, bool), PollError>),
178+
PendingStart(Box<Result<(PermittedTqResp<PollActivityTaskQueueResponse>, bool), PollError>>),
179179
}
180180

181181
impl WorkerActivityTasks {
@@ -209,7 +209,7 @@ impl WorkerActivityTasks {
209209
let complete_notify = Arc::new(Notify::new());
210210
let source_stream = stream::select_with_strategy(
211211
UnboundedReceiverStream::new(cancels_rx).map(ActivityTaskSource::from),
212-
starts_stream.map(ActivityTaskSource::from),
212+
starts_stream.map(|a| ActivityTaskSource::from(Box::new(a))),
213213
|_: &mut ()| PollNext::Left,
214214
);
215215

core/src/worker/activities/local_activities.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,7 @@ impl TimeoutBag {
893893
/// as request to schedule it arrives.
894894
///
895895
/// Returns error in the event the activity is *already* timed out
896+
#[allow(clippy::result_large_err)]
896897
fn new(
897898
new_la: &NewLocalAct,
898899
cancel_chan: UnboundedSender<CancelOrTimeout>,

core/src/worker/nexus.rs

Lines changed: 72 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl NexusManager {
7171
let task_stream_input = stream::select_with_strategy(
7272
UnboundedReceiverStream::new(cancels_rx).map(TaskStreamInput::from),
7373
source_stream
74-
.map(TaskStreamInput::from)
74+
.map(|p| TaskStreamInput::from(Box::new(p)))
7575
.chain(stream::once(async move { TaskStreamInput::SourceComplete })),
7676
|_: &mut ()| PollNext::Left,
7777
);
@@ -244,90 +244,92 @@ where
244244
self.source_stream
245245
.filter_map(move |t| {
246246
let res = match t {
247-
TaskStreamInput::Poll(Ok(t)) => {
248-
if let Some(dur) = t.resp.sched_to_start() {
249-
self.metrics.nexus_task_sched_to_start_latency(dur);
250-
};
247+
TaskStreamInput::Poll(t) => match *t {
248+
Ok(t) => {
249+
if let Some(dur) = t.resp.sched_to_start() {
250+
self.metrics.nexus_task_sched_to_start_latency(dur);
251+
};
251252

252-
let tt = TaskToken(t.resp.task_token.clone());
253-
let mut timeout_task = None;
254-
if let Some(timeout_str) = t
255-
.resp
256-
.request
257-
.as_ref()
258-
.and_then(|r| r.header.get(REQUEST_TIMEOUT_HEADER))
259-
{
260-
if let Ok(timeout_dur) = parse_request_timeout(timeout_str) {
261-
let tt_clone = tt.clone();
262-
let cancels_tx = self.cancels_tx.clone();
263-
timeout_task = Some(tokio::task::spawn(async move {
264-
tokio::time::sleep(timeout_dur).await;
265-
debug!(
253+
let tt = TaskToken(t.resp.task_token.clone());
254+
let mut timeout_task = None;
255+
if let Some(timeout_str) = t
256+
.resp
257+
.request
258+
.as_ref()
259+
.and_then(|r| r.header.get(REQUEST_TIMEOUT_HEADER))
260+
{
261+
if let Ok(timeout_dur) = parse_request_timeout(timeout_str) {
262+
let tt_clone = tt.clone();
263+
let cancels_tx = self.cancels_tx.clone();
264+
timeout_task = Some(tokio::task::spawn(async move {
265+
tokio::time::sleep(timeout_dur).await;
266+
debug!(
266267
task_token=%tt_clone,
267268
"Timing out nexus task due to elapsed local timeout timer"
268269
);
269-
let _ = cancels_tx.send(CancelNexusTask {
270-
task_token: tt_clone.0,
271-
reason: NexusTaskCancelReason::TimedOut.into(),
272-
});
273-
}));
274-
} else {
275-
// This could auto-respond and fail the nexus task, but given that
276-
// the server is going to try to parse this as well, and all we're
277-
// doing with this parsing is notifying the handler of a local
278-
// timeout, it seems reasonable to rely on server to handle this.
279-
warn!(
270+
let _ = cancels_tx.send(CancelNexusTask {
271+
task_token: tt_clone.0,
272+
reason: NexusTaskCancelReason::TimedOut.into(),
273+
});
274+
}));
275+
} else {
276+
// This could auto-respond and fail the nexus task, but given that
277+
// the server is going to try to parse this as well, and all we're
278+
// doing with this parsing is notifying the handler of a local
279+
// timeout, it seems reasonable to rely on server to handle this.
280+
warn!(
280281
"Failed to parse nexus timeout header value '{}'",
281282
timeout_str
282283
);
284+
}
283285
}
284-
}
285286

286-
let (service, operation, request_kind) = t
287-
.resp
288-
.request
289-
.as_ref()
290-
.and_then(|r| r.variant.as_ref())
291-
.map(|v| match v {
292-
Variant::StartOperation(s) => (
293-
s.service.to_owned(),
294-
s.operation.to_owned(),
295-
RequestKind::Start,
296-
),
297-
Variant::CancelOperation(c) => (
298-
c.service.to_owned(),
299-
c.operation.to_owned(),
300-
RequestKind::Cancel,
301-
),
302-
})
303-
.unwrap_or_default();
304-
self.outstanding_task_map.lock().insert(
305-
tt,
306-
NexusInFlightTask {
307-
request_kind,
308-
timeout_task,
309-
scheduled_time: t
310-
.resp
311-
.request
312-
.as_ref()
313-
.and_then(|r| r.scheduled_time)
314-
.and_then(|t| t.try_into().ok()),
315-
start_time: Instant::now(),
316-
_permit: t.permit.into_used(NexusSlotInfo { service, operation }),
317-
},
318-
);
319-
Some(Ok(NexusTask {
320-
variant: Some(nexus_task::Variant::Task(t.resp)),
321-
}))
322-
}
287+
let (service, operation, request_kind) = t
288+
.resp
289+
.request
290+
.as_ref()
291+
.and_then(|r| r.variant.as_ref())
292+
.map(|v| match v {
293+
Variant::StartOperation(s) => (
294+
s.service.to_owned(),
295+
s.operation.to_owned(),
296+
RequestKind::Start,
297+
),
298+
Variant::CancelOperation(c) => (
299+
c.service.to_owned(),
300+
c.operation.to_owned(),
301+
RequestKind::Cancel,
302+
),
303+
})
304+
.unwrap_or_default();
305+
self.outstanding_task_map.lock().insert(
306+
tt,
307+
NexusInFlightTask {
308+
request_kind,
309+
timeout_task,
310+
scheduled_time: t
311+
.resp
312+
.request
313+
.as_ref()
314+
.and_then(|r| r.scheduled_time)
315+
.and_then(|t| t.try_into().ok()),
316+
start_time: Instant::now(),
317+
_permit: t.permit.into_used(NexusSlotInfo { service, operation }),
318+
},
319+
);
320+
Some(Ok(NexusTask {
321+
variant: Some(nexus_task::Variant::Task(t.resp)),
322+
}))
323+
},
324+
Err(e) => Some(Err(PollError::TonicError(e)))
325+
},
323326
TaskStreamInput::Cancel(c) => Some(Ok(NexusTask {
324327
variant: Some(nexus_task::Variant::CancelTask(c)),
325328
})),
326329
TaskStreamInput::SourceComplete => {
327330
source_done.cancel();
328331
None
329332
}
330-
TaskStreamInput::Poll(Err(e)) => Some(Err(PollError::TonicError(e))),
331333
};
332334
async move { res }
333335
})
@@ -379,7 +381,7 @@ enum RequestKind {
379381

380382
#[derive(derive_more::From)]
381383
enum TaskStreamInput {
382-
Poll(NexusPollItem),
384+
Poll(Box<NexusPollItem>),
383385
Cancel(CancelNexusTask),
384386
SourceComplete,
385387
}

0 commit comments

Comments
 (0)