Skip to content

Commit e479c8a

Browse files
committed
Fix compile
1 parent b43165e commit e479c8a

File tree

1 file changed

+66
-8
lines changed

1 file changed

+66
-8
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ use temporal_sdk_core::api::errors::PollError;
1515
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
1616
use temporal_sdk_core_api::errors::WorkflowErrorType;
1717
use temporal_sdk_core_api::worker::{
18-
SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext,
19-
SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit,
18+
PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
19+
SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait,
20+
SlotSupplierPermit,
2021
};
2122
use temporal_sdk_core_api::Worker;
2223
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
@@ -44,7 +45,7 @@ pub struct WorkerRef {
4445
pub struct WorkerConfig {
4546
namespace: String,
4647
task_queue: String,
47-
build_id: String,
48+
versioning_strategy: WorkerVersioningStrategy,
4849
identity_override: Option<String>,
4950
max_cached_workflows: usize,
5051
tuner: TunerHolder,
@@ -58,11 +59,34 @@ pub struct WorkerConfig {
5859
max_activities_per_second: Option<f64>,
5960
max_task_queue_activities_per_second: Option<f64>,
6061
graceful_shutdown_period_millis: u64,
61-
use_worker_versioning: bool,
6262
nondeterminism_as_workflow_fail: bool,
6363
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
6464
}
6565

66+
/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy]
67+
#[derive(FromPyObject)]
68+
pub enum WorkerVersioningStrategy {
69+
None { build_id: String },
70+
WorkerDeploymentBased(WorkerDeploymentOptions),
71+
LegacyBuildIdBased { build_id: String },
72+
}
73+
74+
/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentOptions]
75+
#[derive(FromPyObject)]
76+
pub struct WorkerDeploymentOptions {
77+
pub version: WorkerDeploymentVersion,
78+
pub use_worker_versioning: bool,
79+
/// This is a [enums::v1::VersioningBehavior] represented as i32
80+
pub default_versioning_behavior: i32,
81+
}
82+
83+
/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentVersion]
84+
#[derive(FromPyObject)]
85+
pub struct WorkerDeploymentVersion {
86+
pub deployment_name: String,
87+
pub build_id: String,
88+
}
89+
6690
#[derive(FromPyObject)]
6791
pub struct TunerHolder {
6892
workflow_slot_supplier: SlotSupplier,
@@ -559,16 +583,21 @@ fn convert_worker_config(
559583
task_locals: Arc<OnceLock<pyo3_asyncio::TaskLocals>>,
560584
) -> PyResult<temporal_sdk_core::WorkerConfig> {
561585
let converted_tuner = convert_tuner_holder(conf.tuner, task_locals)?;
586+
let converted_versioning_strategy = convert_versioning_strategy(conf.versioning_strategy);
562587
temporal_sdk_core::WorkerConfigBuilder::default()
563588
.namespace(conf.namespace)
564589
.task_queue(conf.task_queue)
565-
.worker_build_id(conf.build_id)
590+
.versioning_strategy(converted_versioning_strategy)
566591
.client_identity_override(conf.identity_override)
567592
.max_cached_workflows(conf.max_cached_workflows)
568-
.max_concurrent_wft_polls(conf.max_concurrent_workflow_task_polls)
593+
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
594+
conf.max_concurrent_workflow_task_polls,
595+
))
569596
.tuner(Arc::new(converted_tuner))
570597
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
571-
.max_concurrent_at_polls(conf.max_concurrent_activity_task_polls)
598+
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
599+
conf.max_concurrent_activity_task_polls,
600+
))
572601
.no_remote_activities(conf.no_remote_activities)
573602
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
574603
conf.sticky_queue_schedule_to_start_timeout_millis,
@@ -585,7 +614,6 @@ fn convert_worker_config(
585614
// auto-cancel-activity behavior of shutdown will not occur, so we
586615
// always set it even if 0.
587616
.graceful_shutdown_period(Duration::from_millis(conf.graceful_shutdown_period_millis))
588-
.use_worker_versioning(conf.use_worker_versioning)
589617
.workflow_failure_errors(if conf.nondeterminism_as_workflow_fail {
590618
HashSet::from([WorkflowErrorType::Nondeterminism])
591619
} else {
@@ -702,6 +730,36 @@ fn convert_slot_supplier<SK: SlotKind + Send + Sync + 'static>(
702730
})
703731
}
704732

733+
fn convert_versioning_strategy(
734+
strategy: WorkerVersioningStrategy,
735+
) -> temporal_sdk_core_api::worker::WorkerVersioningStrategy {
736+
match strategy {
737+
WorkerVersioningStrategy::None { build_id } => {
738+
temporal_sdk_core_api::worker::WorkerVersioningStrategy::None { build_id }
739+
}
740+
WorkerVersioningStrategy::WorkerDeploymentBased(worker_deployment_options) => {
741+
temporal_sdk_core_api::worker::WorkerVersioningStrategy::WorkerDeploymentBased(
742+
temporal_sdk_core_api::worker::WorkerDeploymentOptions {
743+
version: temporal_sdk_core_api::worker::WorkerDeploymentVersion {
744+
deployment_name: worker_deployment_options.version.deployment_name,
745+
build_id: worker_deployment_options.version.build_id,
746+
},
747+
use_worker_versioning: worker_deployment_options.use_worker_versioning,
748+
default_versioning_behavior: Some(
749+
worker_deployment_options
750+
.default_versioning_behavior
751+
.try_into()
752+
.unwrap_or_default(),
753+
),
754+
},
755+
)
756+
}
757+
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => {
758+
temporal_sdk_core_api::worker::WorkerVersioningStrategy::LegacyBuildIdBased { build_id }
759+
}
760+
}
761+
}
762+
705763
/// For feeding histories into core during replay
706764
#[pyclass]
707765
pub struct HistoryPusher {

0 commit comments

Comments
 (0)