Skip to content

Commit 5c3955f

Browse files
authored
User-implementable SlotSupplier (#1553)
1 parent 8386286 commit 5c3955f

File tree

14 files changed

+847
-70
lines changed

14 files changed

+847
-70
lines changed

packages/core-bridge/Cargo.lock

Lines changed: 13 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ lto = true
1717
incremental = false
1818

1919
[dependencies]
20+
async-trait = "0.1.83"
2021
futures = { version = "0.3", features = ["executor"] }
2122
log = "0.4"
22-
neon = { version = "1.0.0", default-features = false, features = ["napi-6"] }
23+
neon = { version = "1.0.0", default-features = false, features = ["napi-6", "futures"] }
2324
opentelemetry = "0.24"
2425
parking_lot = "0.12"
2526
prost = "0.13"

packages/core-bridge/src/conversions.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ use neon::{
55
prelude::*,
66
types::{JsBoolean, JsNumber, JsString},
77
};
8+
use slot_supplier_bridge::SlotSupplierBridge;
89
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
910
use temporal_client::HttpConnectProxyOptions;
11+
use temporal_sdk_core::api::worker::SlotKind;
1012
use temporal_sdk_core::{
1113
api::telemetry::{Logger, MetricTemporality, TelemetryOptions, TelemetryOptionsBuilder},
1214
api::{
@@ -25,6 +27,8 @@ use temporal_sdk_core::{
2527
TlsConfig, TunerHolderOptionsBuilder, Url,
2628
};
2729

30+
mod slot_supplier_bridge;
31+
2832
pub enum EphemeralServerConfig {
2933
TestServer(TestServerConfig),
3034
DevServer(TemporalDevServerConfig),
@@ -65,11 +69,11 @@ pub(crate) trait ObjectHandleConversionsExt {
6569
&self,
6670
cx: &mut FunctionContext,
6771
) -> NeonResult<HashMap<String, String>>;
68-
fn as_slot_supplier(
69-
&self,
72+
fn into_slot_supplier<SK: SlotKind + Send + Sync + 'static>(
73+
self,
7074
cx: &mut FunctionContext,
7175
rbo: &mut Option<ResourceBasedSlotsOptions>,
72-
) -> NeonResult<SlotSupplierOptions>;
76+
) -> NeonResult<SlotSupplierOptions<SK>>;
7377
}
7478

7579
impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
@@ -409,18 +413,18 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
409413
if let Some(wf_slot_supp) =
410414
js_optional_getter!(cx, &tuner, "workflowTaskSlotSupplier", JsObject)
411415
{
412-
tuner_holder.workflow_slot_options(wf_slot_supp.as_slot_supplier(cx, &mut rbo)?);
416+
tuner_holder.workflow_slot_options(wf_slot_supp.into_slot_supplier(cx, &mut rbo)?);
413417
}
414418
if let Some(act_slot_supp) =
415419
js_optional_getter!(cx, &tuner, "activityTaskSlotSupplier", JsObject)
416420
{
417-
tuner_holder.activity_slot_options(act_slot_supp.as_slot_supplier(cx, &mut rbo)?);
421+
tuner_holder.activity_slot_options(act_slot_supp.into_slot_supplier(cx, &mut rbo)?);
418422
}
419423
if let Some(local_act_slot_supp) =
420424
js_optional_getter!(cx, &tuner, "localActivityTaskSlotSupplier", JsObject)
421425
{
422426
tuner_holder.local_activity_slot_options(
423-
local_act_slot_supp.as_slot_supplier(cx, &mut rbo)?,
427+
local_act_slot_supp.into_slot_supplier(cx, &mut rbo)?,
424428
);
425429
}
426430
if let Some(rbo) = rbo {
@@ -567,20 +571,20 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
567571
}
568572
}
569573

570-
fn as_slot_supplier(
571-
&self,
574+
fn into_slot_supplier<SK: SlotKind + Send + Sync + 'static>(
575+
self,
572576
cx: &mut FunctionContext,
573577
rbo: &mut Option<ResourceBasedSlotsOptions>,
574-
) -> NeonResult<SlotSupplierOptions> {
575-
match js_value_getter!(cx, self, "type", JsString).as_str() {
578+
) -> NeonResult<SlotSupplierOptions<SK>> {
579+
match js_value_getter!(cx, &self, "type", JsString).as_str() {
576580
"fixed-size" => Ok(SlotSupplierOptions::FixedSize {
577-
slots: js_value_getter!(cx, self, "numSlots", JsNumber) as usize,
581+
slots: js_value_getter!(cx, &self, "numSlots", JsNumber) as usize,
578582
}),
579583
"resource-based" => {
580-
let min_slots = js_value_getter!(cx, self, "minimumSlots", JsNumber);
581-
let max_slots = js_value_getter!(cx, self, "maximumSlots", JsNumber);
582-
let ramp_throttle = js_value_getter!(cx, self, "rampThrottleMs", JsNumber) as u64;
583-
if let Some(tuner_opts) = js_optional_getter!(cx, self, "tunerOptions", JsObject) {
584+
let min_slots = js_value_getter!(cx, &self, "minimumSlots", JsNumber);
585+
let max_slots = js_value_getter!(cx, &self, "maximumSlots", JsNumber);
586+
let ramp_throttle = js_value_getter!(cx, &self, "rampThrottleMs", JsNumber) as u64;
587+
if let Some(tuner_opts) = js_optional_getter!(cx, &self, "tunerOptions", JsObject) {
584588
let target_mem =
585589
js_value_getter!(cx, &tuner_opts, "targetMemoryUsage", JsNumber);
586590
let target_cpu = js_value_getter!(cx, &tuner_opts, "targetCpuUsage", JsNumber);
@@ -603,6 +607,10 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
603607
),
604608
))
605609
}
610+
"custom" => {
611+
let ssb = SlotSupplierBridge::new(cx, self)?;
612+
Ok(SlotSupplierOptions::Custom(Arc::new(ssb)))
613+
}
606614
_ => cx.throw_type_error("Invalid slot supplier type"),
607615
}
608616
}

0 commit comments

Comments
 (0)