Skip to content

Commit 7ac4445

Browse files
authored
Expose resource based auto-tuner options (#559)
1 parent 2c1ac54 commit 7ac4445

File tree

10 files changed

+785
-170
lines changed

10 files changed

+785
-170
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 209 additions & 123 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ name = "temporal_sdk_bridge"
88
crate-type = ["cdylib"]
99

1010
[dependencies]
11+
anyhow = "1.0"
1112
futures = "0.3"
1213
log = "0.4"
1314
once_cell = "1.16"
1415
prost = "0.12"
1516
prost-types = "0.12"
16-
pyo3 = { version = "0.19", features = ["extension-module", "abi3-py38"] }
17-
pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] }
18-
pythonize = "0.19"
17+
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py38", "anyhow"] }
18+
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
19+
pythonize = "0.20"
1920
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
2021
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
2122
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }

temporalio/bridge/src/runtime.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use futures::channel::mpsc::Receiver;
22
use pyo3::exceptions::{PyRuntimeError, PyValueError};
33
use pyo3::prelude::*;
4-
use pyo3::AsPyPointer;
54
use pythonize::pythonize;
65
use std::collections::HashMap;
76
use std::future::Future;

temporalio/bridge/src/worker.rs

Lines changed: 131 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::Context;
12
use prost::Message;
23
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
34
use pyo3::prelude::*;
@@ -34,9 +35,7 @@ pub struct WorkerConfig {
3435
build_id: String,
3536
identity_override: Option<String>,
3637
max_cached_workflows: usize,
37-
max_outstanding_workflow_tasks: usize,
38-
max_outstanding_activities: usize,
39-
max_outstanding_local_activities: usize,
38+
tuner: TunerHolder,
4039
max_concurrent_workflow_task_polls: usize,
4140
nonsticky_to_sticky_poll_ratio: f32,
4241
max_concurrent_activity_task_polls: usize,
@@ -52,6 +51,39 @@ pub struct WorkerConfig {
5251
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
5352
}
5453

54+
#[derive(FromPyObject)]
55+
pub struct TunerHolder {
56+
workflow_slot_supplier: SlotSupplier,
57+
activity_slot_supplier: SlotSupplier,
58+
local_activity_slot_supplier: SlotSupplier,
59+
}
60+
61+
#[derive(FromPyObject)]
62+
pub enum SlotSupplier {
63+
FixedSize(FixedSizeSlotSupplier),
64+
ResourceBased(ResourceBasedSlotSupplier),
65+
}
66+
67+
#[derive(FromPyObject)]
68+
pub struct FixedSizeSlotSupplier {
69+
num_slots: usize,
70+
}
71+
72+
#[derive(FromPyObject)]
73+
pub struct ResourceBasedSlotSupplier {
74+
minimum_slots: usize,
75+
maximum_slots: usize,
76+
// Need pyo3 0.21+ for this to be std Duration
77+
ramp_throttle_ms: u64,
78+
tuner_config: ResourceBasedTunerConfig,
79+
}
80+
81+
#[derive(FromPyObject, Clone, Copy, PartialEq)]
82+
pub struct ResourceBasedTunerConfig {
83+
target_memory_usage: f64,
84+
target_cpu_usage: f64,
85+
}
86+
5587
macro_rules! enter_sync {
5688
($runtime:expr) => {
5789
if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() {
@@ -73,7 +105,7 @@ pub fn new_worker(
73105
config,
74106
client.retry_client.clone().into_inner(),
75107
)
76-
.map_err(|err| PyValueError::new_err(format!("Failed creating worker: {}", err)))?;
108+
.context("Failed creating worker")?;
77109
Ok(WorkerRef {
78110
worker: Some(Arc::new(worker)),
79111
runtime: runtime_ref.runtime.clone(),
@@ -107,9 +139,11 @@ impl WorkerRef {
107139
fn validate<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
108140
let worker = self.worker.as_ref().unwrap().clone();
109141
self.runtime.future_into_py(py, async move {
110-
worker.validate().await.map_err(|err| {
111-
PyRuntimeError::new_err(format!("Worker validation failed: {}", err))
112-
})
142+
worker
143+
.validate()
144+
.await
145+
.context("Worker validation failed")
146+
.map_err(Into::into)
113147
})
114148
}
115149

@@ -151,10 +185,8 @@ impl WorkerRef {
151185
worker
152186
.complete_workflow_activation(completion)
153187
.await
154-
.map_err(|err| {
155-
// TODO(cretz): More error types
156-
PyRuntimeError::new_err(format!("Completion failure: {}", err))
157-
})
188+
.context("Completion failure")
189+
.map_err(Into::into)
158190
})
159191
}
160192

@@ -166,10 +198,8 @@ impl WorkerRef {
166198
worker
167199
.complete_activity_task(completion)
168200
.await
169-
.map_err(|err| {
170-
// TODO(cretz): More error types
171-
PyRuntimeError::new_err(format!("Completion failure: {}", err))
172-
})
201+
.context("Completion failure")
202+
.map_err(Into::into)
173203
})
174204
}
175205

@@ -226,16 +256,15 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
226256
type Error = PyErr;
227257

228258
fn try_from(conf: WorkerConfig) -> PyResult<Self> {
259+
let converted_tuner: temporal_sdk_core::TunerHolder = conf.tuner.try_into()?;
229260
temporal_sdk_core::WorkerConfigBuilder::default()
230261
.namespace(conf.namespace)
231262
.task_queue(conf.task_queue)
232263
.worker_build_id(conf.build_id)
233264
.client_identity_override(conf.identity_override)
234265
.max_cached_workflows(conf.max_cached_workflows)
235-
.max_outstanding_workflow_tasks(conf.max_outstanding_workflow_tasks)
236-
.max_outstanding_activities(conf.max_outstanding_activities)
237-
.max_outstanding_local_activities(conf.max_outstanding_local_activities)
238266
.max_concurrent_wft_polls(conf.max_concurrent_workflow_task_polls)
267+
.tuner(Arc::new(converted_tuner))
239268
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
240269
.max_concurrent_at_polls(conf.max_concurrent_activity_task_polls)
241270
.no_remote_activities(conf.no_remote_activities)
@@ -276,6 +305,90 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
276305
}
277306
}
278307

308+
impl TryFrom<TunerHolder> for temporal_sdk_core::TunerHolder {
309+
type Error = PyErr;
310+
311+
fn try_from(holder: TunerHolder) -> PyResult<Self> {
312+
// Verify all resource-based options are the same if any are set
313+
let maybe_wf_resource_opts =
314+
if let SlotSupplier::ResourceBased(ref ss) = holder.workflow_slot_supplier {
315+
Some(&ss.tuner_config)
316+
} else {
317+
None
318+
};
319+
let maybe_act_resource_opts =
320+
if let SlotSupplier::ResourceBased(ref ss) = holder.activity_slot_supplier {
321+
Some(&ss.tuner_config)
322+
} else {
323+
None
324+
};
325+
let maybe_local_act_resource_opts =
326+
if let SlotSupplier::ResourceBased(ref ss) = holder.local_activity_slot_supplier {
327+
Some(&ss.tuner_config)
328+
} else {
329+
None
330+
};
331+
let all_resource_opts = [
332+
maybe_wf_resource_opts,
333+
maybe_act_resource_opts,
334+
maybe_local_act_resource_opts,
335+
];
336+
let mut set_resource_opts = all_resource_opts.iter().flatten();
337+
let first = set_resource_opts.next();
338+
let all_are_same = if let Some(first) = first {
339+
set_resource_opts.all(|elem| elem == first)
340+
} else {
341+
true
342+
};
343+
if !all_are_same {
344+
return Err(PyValueError::new_err(
345+
"All resource-based slot suppliers must have the same ResourceBasedTunerOptions",
346+
));
347+
}
348+
349+
let mut options = temporal_sdk_core::TunerHolderOptionsBuilder::default();
350+
if let Some(first) = first {
351+
options.resource_based_options(
352+
temporal_sdk_core::ResourceBasedSlotsOptionsBuilder::default()
353+
.target_mem_usage(first.target_memory_usage)
354+
.target_cpu_usage(first.target_cpu_usage)
355+
.build()
356+
.expect("Building ResourceBasedSlotsOptions is infallible"),
357+
);
358+
};
359+
options
360+
.workflow_slot_options(holder.workflow_slot_supplier.try_into()?)
361+
.activity_slot_options(holder.activity_slot_supplier.try_into()?)
362+
.local_activity_slot_options(holder.local_activity_slot_supplier.try_into()?);
363+
Ok(options
364+
.build()
365+
.map_err(|e| PyValueError::new_err(format!("Invalid tuner holder options: {}", e)))?
366+
.build_tuner_holder()
367+
.context("Failed building tuner holder")?)
368+
}
369+
}
370+
371+
impl TryFrom<SlotSupplier> for temporal_sdk_core::SlotSupplierOptions {
372+
type Error = PyErr;
373+
374+
fn try_from(supplier: SlotSupplier) -> PyResult<temporal_sdk_core::SlotSupplierOptions> {
375+
Ok(match supplier {
376+
SlotSupplier::FixedSize(fs) => temporal_sdk_core::SlotSupplierOptions::FixedSize {
377+
slots: fs.num_slots,
378+
},
379+
SlotSupplier::ResourceBased(ss) => {
380+
temporal_sdk_core::SlotSupplierOptions::ResourceBased(
381+
temporal_sdk_core::ResourceSlotOptions::new(
382+
ss.minimum_slots,
383+
ss.maximum_slots,
384+
Duration::from_millis(ss.ramp_throttle_ms),
385+
),
386+
)
387+
}
388+
})
389+
}
390+
}
391+
279392
/// For feeding histories into core during replay
280393
#[pyclass]
281394
pub struct HistoryPusher {

temporalio/bridge/worker.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
from dataclasses import dataclass
9+
from datetime import timedelta
910
from typing import (
1011
TYPE_CHECKING,
1112
Awaitable,
@@ -15,6 +16,7 @@
1516
Sequence,
1617
Set,
1718
Tuple,
19+
Union,
1820
)
1921

2022
import google.protobuf.internal.containers
@@ -43,9 +45,7 @@ class WorkerConfig:
4345
build_id: str
4446
identity_override: Optional[str]
4547
max_cached_workflows: int
46-
max_outstanding_workflow_tasks: int
47-
max_outstanding_activities: int
48-
max_outstanding_local_activities: int
48+
tuner: TunerHolder
4949
max_concurrent_workflow_task_polls: int
5050
nonsticky_to_sticky_poll_ratio: float
5151
max_concurrent_activity_task_polls: int
@@ -61,6 +61,43 @@ class WorkerConfig:
6161
nondeterminism_as_workflow_fail_for_types: Set[str]
6262

6363

64+
@dataclass
65+
class ResourceBasedTunerConfig:
66+
"""Python representation of the Rust struct for configuring a resource-based tuner."""
67+
68+
target_memory_usage: float
69+
target_cpu_usage: float
70+
71+
72+
@dataclass
73+
class ResourceBasedSlotSupplier:
74+
"""Python representation of the Rust struct for a resource-based slot supplier."""
75+
76+
minimum_slots: int
77+
maximum_slots: int
78+
ramp_throttle_ms: int
79+
tuner_config: ResourceBasedTunerConfig
80+
81+
82+
@dataclass(frozen=True)
83+
class FixedSizeSlotSupplier:
84+
"""Python representation of the Rust struct for a fixed-size slot supplier."""
85+
86+
num_slots: int
87+
88+
89+
SlotSupplier: TypeAlias = Union[FixedSizeSlotSupplier, ResourceBasedSlotSupplier]
90+
91+
92+
@dataclass
93+
class TunerHolder:
94+
"""Python representation of the Rust struct for a tuner holder."""
95+
96+
workflow_slot_supplier: SlotSupplier
97+
activity_slot_supplier: SlotSupplier
98+
local_activity_slot_supplier: SlotSupplier
99+
100+
64101
class Worker:
65102
"""SDK Core worker."""
66103

temporalio/worker/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
WorkflowReplayResult,
2727
WorkflowReplayResults,
2828
)
29+
from ._tuning import (
30+
FixedSizeSlotSupplier,
31+
ResourceBasedSlotConfig,
32+
ResourceBasedSlotSupplier,
33+
ResourceBasedTunerConfig,
34+
WorkerTuner,
35+
)
2936
from ._worker import Worker, WorkerConfig
3037
from ._workflow_instance import (
3138
UnsandboxedWorkflowRunner,
@@ -69,4 +76,10 @@
6976
"WorkflowInstance",
7077
"WorkflowInstanceDetails",
7178
"UnsandboxedWorkflowRunner",
79+
# Tuning types
80+
"WorkerTuner",
81+
"FixedSizeSlotSupplier",
82+
"ResourceBasedSlotSupplier",
83+
"ResourceBasedTunerConfig",
84+
"ResourceBasedSlotConfig",
7285
]

temporalio/worker/_replayer.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,17 @@ def on_eviction_hook(
223223
nondeterminism_as_workflow_fail_for_types=workflow_worker.nondeterminism_as_workflow_fail_for_types(),
224224
# All values below are ignored but required by Core
225225
max_cached_workflows=2,
226-
max_outstanding_workflow_tasks=2,
227-
max_outstanding_activities=1,
228-
max_outstanding_local_activities=1,
226+
tuner=temporalio.bridge.worker.TunerHolder(
227+
workflow_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
228+
2
229+
),
230+
activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
231+
1
232+
),
233+
local_activity_slot_supplier=temporalio.bridge.worker.FixedSizeSlotSupplier(
234+
1
235+
),
236+
),
229237
max_concurrent_workflow_task_polls=1,
230238
nonsticky_to_sticky_poll_ratio=1,
231239
max_concurrent_activity_task_polls=1,

0 commit comments

Comments
 (0)