diff --git a/core-c-bridge/Cargo.toml b/core-c-bridge/Cargo.toml index 6c98f4702..54a855e83 100644 --- a/core-c-bridge/Cargo.toml +++ b/core-c-bridge/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [lib] name = "temporal_sdk_core_c_bridge" -crate-type = ["cdylib"] +crate-type = ["cdylib", "staticlib"] [dependencies] anyhow = "1.0" @@ -24,6 +24,7 @@ tokio-util = "0.7" tonic = { workspace = true } tracing = "0.1" url = "2.2" +uuid = { version = "1", features = ["v4"] } [dependencies.temporal-client] path = "../client" diff --git a/core-c-bridge/include/temporal-sdk-core-c-bridge.h b/core-c-bridge/include/temporal-sdk-core-c-bridge.h index 728046a12..caee89351 100644 --- a/core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -5,6 +5,29 @@ #include #include +/** + * Interop mirror of the Rust tonic [tonic::Code] enum + */ +typedef enum TemporalCoreCallbackBasedWorkerClientGRPCCode { + Ok = 0, + Cancelled = 1, + Unknown = 2, + InvalidArgument = 3, + DeadlineExceeded = 4, + NotFound = 5, + AlreadyExists = 6, + PermissionDenied = 7, + ResourceExhausted = 8, + FailedPrecondition = 9, + Aborted = 10, + OutOfRange = 11, + Unimplemented = 12, + Internal = 13, + Unavailable = 14, + DataLoss = 15, + Unauthenticated = 16, +} TemporalCoreCallbackBasedWorkerClientGRPCCode; + typedef enum TemporalCoreForwardedLogLevel { Trace = 0, Debug, @@ -629,6 +652,130 @@ typedef struct TemporalCoreWorkerReplayPushResult { const struct TemporalCoreByteArray *fail; } TemporalCoreWorkerReplayPushResult; +/** + * Interop function pointer signature of the retry predicate of [CallbackBasedWorkerClientPollOptions] + */ +typedef bool (*TemporalCoreCallbackBasedWorkerClientNoRetryPredicate)(void *user_data, + enum TemporalCoreCallbackBasedWorkerClientGRPCCode code); + +/** + * Interop mirror of [temporal_sdk_core::PollOptions] + */ +typedef struct TemporalCoreCallbackBasedWorkerClientPollOptions { + /** + * always non-null (could be empty) + */ + struct TemporalCoreByteArrayRef task_queue; + /** + * true if `no_retry_predicate` is valid + */ + bool has_no_retry; + /** + * Decide short‐circuit GRPC error + */ + TemporalCoreCallbackBasedWorkerClientNoRetryPredicate no_retry_predicate; + /** + * Opaque context pointer passed back to your predicate thunk + */ + void *no_retry_user_data; + /** + * true if `timeout_override.is_some()` + */ + bool has_timeout_ms; + /** + * timeout in milliseconds + */ + uint64_t timeout_ms; +} TemporalCoreCallbackBasedWorkerClientPollOptions; + +/** + * Function pointer for RPC callbacks delivering a `(success, failure)` + * pair as non-owning `ByteArrayRef`s to Rust. + */ +typedef void (*TemporalCoreWorkerClientCallbackTrampoline)(void *user_data, + struct TemporalCoreByteArrayRef success, + struct TemporalCoreByteArrayRef failure); + +/** + * Interop function pointer table implementing the RPC logic of the [temporal_sdk_core::WorkerClient] trait via callbacks. + */ +typedef struct TemporalCoreCallbackBasedWorkerClientRPCs { + void *user_data; + void (*poll_workflow_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*poll_activity_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*poll_nexus_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*complete_workflow_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*complete_activity_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*complete_nexus_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*record_activity_heartbeat)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*cancel_activity_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*fail_activity_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*fail_workflow_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*fail_nexus_task)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*get_workflow_execution_history)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*respond_legacy_query)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*describe_namespace)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_ud); + void (*shutdown_worker)(void *user_data, + struct TemporalCoreByteArrayRef request, + TemporalCoreWorkerClientCallbackTrampoline callback, + void *callback_user_data); + bool (*is_mock)(void *user_data); +} TemporalCoreCallbackBasedWorkerClientRPCs; + +/** + * Interop configuration of the [WorkerClientConfig]. + */ +typedef struct TemporalCoreCallbackBasedWorkerClientConfig { + struct TemporalCoreByteArrayRef client_name; + struct TemporalCoreByteArrayRef client_version; + struct TemporalCoreByteArrayRef identity; +} TemporalCoreCallbackBasedWorkerClientConfig; + #ifdef __cplusplus extern "C" { #endif // __cplusplus @@ -812,6 +959,32 @@ void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id); void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx, void *token_ptr); +/** + * Create a new [Worker] with a custom callback-backed [temporal_sdk_core::WorkerClient] implementation. + * + * # Safety + * - All pointer args (`client_rpcs`, `options`, `client_config`, `runtime`) must be non-null and valid. + * - `system_info_ref.data` must point to at least `system_info_ref.size` bytes if non-zero. + * - Caller retains ownership; these must outlive the call. + * + * # Params + * - `client_rpcs`: pointer to [CallbackBasedWorkerClientRPCs] implementing Core SDK [temporal_sdk_core::WorkerClient] RPCs. + * - `options`: pointer to [WorkerOptions]. + * - `client_config`: pointer to [CallbackBasedWorkerClientConfig] (client name, version, identity). + * - `system_info_ref`: zero-copy [GetSystemInfoResponse] bytes or empty. + * - `runtime`: pointer to [Runtime]. + * + * # Returns + * A [WorkerOrFail] struct containing: + * - `worker`: on success, a heap-allocated [Worker] pointer; null on error. + * - `fail`: on error, a C string (UTF-8) describing the failure; null on success. + */ +struct TemporalCoreWorkerOrFail temporal_core_new_callback_based_worker_client(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs, + const struct TemporalCoreWorkerOptions *options, + const struct TemporalCoreCallbackBasedWorkerClientConfig *client_config, + struct TemporalCoreByteArrayRef system_info_ref, + struct TemporalCoreRuntime *runtime); + #ifdef __cplusplus } // extern "C" #endif // __cplusplus diff --git a/core-c-bridge/src/callback_based_worker_client/client_impl.rs b/core-c-bridge/src/callback_based_worker_client/client_impl.rs new file mode 100644 index 000000000..f8558f236 --- /dev/null +++ b/core-c-bridge/src/callback_based_worker_client/client_impl.rs @@ -0,0 +1,771 @@ +use std::sync::Arc; + +use temporal_sdk_core::legacy_query_failure; +use temporal_sdk_core_protos::temporal::api::deployment::v1::WorkerDeploymentOptions; +use temporal_sdk_core_protos::temporal::api::nexus; +use temporal_sdk_core_protos::temporal::api::query::v1::WorkflowQueryResult; +use temporal_sdk_core_protos::temporal::api::taskqueue::v1::{TaskQueue, TaskQueueMetadata}; +use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{ + get_system_info_response, respond_workflow_task_completed_request, + GetWorkflowExecutionHistoryRequest, PollActivityTaskQueueRequest, PollNexusTaskQueueRequest, + PollWorkflowTaskQueueRequest, RecordActivityTaskHeartbeatRequest, RespondActivityTaskCanceledRequest, + RespondActivityTaskCompletedRequest, RespondActivityTaskFailedRequest, RespondNexusTaskCompletedRequest, + RespondNexusTaskFailedRequest, RespondQueryTaskCompletedRequest, RespondWorkflowTaskCompletedRequest, + RespondWorkflowTaskFailedRequest, ShutdownWorkerRequest, + PollWorkflowTaskQueueResponse, PollActivityTaskQueueResponse, PollNexusTaskQueueResponse, + RespondWorkflowTaskCompletedResponse, RespondActivityTaskCompletedResponse, RespondNexusTaskCompletedResponse, + RecordActivityTaskHeartbeatResponse, RespondActivityTaskCanceledResponse, RespondActivityTaskFailedResponse, + RespondWorkflowTaskFailedResponse, RespondNexusTaskFailedResponse, GetWorkflowExecutionHistoryResponse, + RespondQueryTaskCompletedResponse, DescribeNamespaceResponse, ShutdownWorkerResponse, + get_system_info_response::Capabilities, +}; +use temporal_sdk_core_protos::temporal::api::common::v1::{Payloads, WorkflowExecution}; +use temporal_sdk_core_protos::temporal::api::failure::v1::Failure; +use temporal_sdk_core_protos::temporal::api::enums::v1::{TaskQueueKind, WorkerVersioningMode, WorkflowTaskFailedCause}; +use tokio::sync::oneshot; +use prost::Message; +use tonic::Status; + +use temporal_sdk_core::{ + LegacyQueryResult, + PollActivityOptions, + PollOptions, + PollWorkflowOptions, + TaskToken, + WorkerClient, + WorkerConfig, + WorkflowTaskCompletion +}; + +use temporal_client::{Client, Namespace, RetryClient, SlotManager}; + +use crate::{ + ByteArrayRef, + ByteArray +}; + +use crate::callback_based_worker_client::interop::CallbackBasedWorkerClientRPCs; +use crate::callback_based_worker_client::interop_types::CallbackBasedWorkerClientPollOptions; +use crate::callback_based_worker_client::trampoline::worker_client_callback_trampoline; + +/// Wrapper that implements the [WorkerClient] trait by using the closures from the [CallbackBasedWorkerClientRPCs] +pub(crate) struct CallbackBasedWorkerClient { + /// Function pointer table implementing the RPC logic of the worker client. + pub(crate) inner: CallbackBasedWorkerClientRPCs, + /// Config of the worker. + pub(crate) worker_config: WorkerConfig, + /// Config of the worker client. + pub(crate) client_config: WorkerClientConfig, + + /// Capabilities as read from the `get_system_info` RPC call made on client startup + pub(crate) capabilities: Option, + /// Registry with workers using this client instance + pub(crate) workers: Arc +} + +/// Configuration of the worker client. +pub(crate) struct WorkerClientConfig { + pub(crate) client_name: String, + pub(crate) client_version: String, + pub(crate) identity: String +} + +impl CallbackBasedWorkerClient { + /// Takes the two ByteArray callbacks from the trampoline (success / failure) and + /// either returns a decoded `Res` or a `Status` error. + fn decode_callback_response( + succ: ByteArray, + fail: ByteArray, + ) -> Result + where + Res: Message + Default, + { + // Success path + if !succ.data.is_null() { + let buf = unsafe { Vec::from_raw_parts(succ.data as *mut u8, succ.size, succ.cap) }; + return Res::decode(&*buf) + .map_err(|e| Status::internal(format!("decode error: {}", e))); + } + + // Failure path: error string + if !fail.data.is_null() { + let msg = unsafe { + String::from_utf8_unchecked( + Vec::from_raw_parts(fail.data as *mut u8, fail.size, fail.cap) + ) + }; + return Err(Status::internal(msg)); + } + + // Both empty → cancelled + Err(Status::cancelled("Cancelled by interop")) + } + + /// Helper to get the worker deployment options. + fn get_deployment_opts(&self) -> WorkerDeploymentOptions { + let deployment_version = self.worker_config.computed_deployment_version(); + + WorkerDeploymentOptions { + deployment_name: deployment_version + .as_ref() + .map(|v| v.deployment_name.clone()) + .unwrap_or_default(), + build_id: deployment_version + .as_ref() + .map(|v| v.build_id.clone()) + .unwrap_or_default(), + worker_versioning_mode: match deployment_version { + Some(_) => WorkerVersioningMode::Versioned as i32, + None => WorkerVersioningMode::Unversioned as i32, + }, + } + } +} + +/// Implementation of the [WorkerClient] trait, delegating the RPC logic to the interop implementation in [CallbackBasedWorkerClientRPCs] +#[async_trait::async_trait] +impl WorkerClient for CallbackBasedWorkerClient { + async fn poll_workflow_task( + &self, + poll_options: PollOptions, + wf_options: PollWorkflowOptions, + ) -> Result { + let task_queue = if let Some(sticky) = wf_options.sticky_queue_name { + TaskQueue { + name: sticky, + kind: TaskQueueKind::Sticky.into(), + normal_name: poll_options.clone().task_queue, + } + } else { + TaskQueue { + name: poll_options.clone().task_queue, + kind: TaskQueueKind::Normal.into(), + normal_name: "".to_string(), + } + }; + + // Build request + #[allow(deprecated)] // want to list all fields explicitly + let req = PollWorkflowTaskQueueRequest { + namespace: self.worker_config.namespace.to_string(), + task_queue: Some(task_queue), + identity: self.client_config.identity.clone(), + binary_checksum: "".to_string(), // deprecated + worker_version_capabilities: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + worker_heartbeat: None, + }; + + // Encode request + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let interop_poll = CallbackBasedWorkerClientPollOptions::from(&poll_options); + + // Set up oneshot channel to interop layer + let (tx, rx) = oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + // Fire the callback with trampoline callback + unsafe { + (self.inner.poll_workflow_task)( + self.inner.user_data, + req_ref, + &interop_poll as *const _, + worker_client_callback_trampoline, + cb_sender, + ) + } + + // Await trampoline's callback pair + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + // Decode callback response + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn poll_activity_task( + &self, + poll_options: PollOptions, + act_options: PollActivityOptions, + ) -> Result { + let interop_poll_options = CallbackBasedWorkerClientPollOptions::from(&poll_options); + + #[allow(deprecated)] + let req = PollActivityTaskQueueRequest { + namespace: self.worker_config.namespace.to_string(), + task_queue: Some(TaskQueue { + name: poll_options.task_queue, + kind: TaskQueueKind::Normal as i32, + normal_name: "".to_string(), + }), + identity: self.client_config.identity.clone(), + task_queue_metadata: act_options.max_tasks_per_sec.map(|tps| TaskQueueMetadata { + max_tasks_per_second: Some(tps), + }), + worker_version_capabilities: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + worker_heartbeat: None, + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.poll_activity_task)( + self.inner.user_data, + req_ref, + &interop_poll_options as *const _, + worker_client_callback_trampoline, + cb_sender, + ) + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn poll_nexus_task( + &self, + poll_options: PollOptions, + ) -> Result { + let interop_poll_options = CallbackBasedWorkerClientPollOptions::from(&poll_options); + + #[allow(deprecated)] + let req = PollNexusTaskQueueRequest { + namespace: self.worker_config.namespace.to_string(), + task_queue: Some(TaskQueue { + name: poll_options.task_queue, + kind: TaskQueueKind::Normal as i32, + normal_name: "".to_string(), + }), + identity: self.client_config.identity.clone(), + worker_version_capabilities: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + worker_heartbeat: None, + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.poll_nexus_task)( + self.inner.user_data, + req_ref, + &interop_poll_options as *const _, + worker_client_callback_trampoline, + cb_sender, + ) + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn complete_workflow_task( + &self, + request: WorkflowTaskCompletion, + ) -> Result { + #[allow(deprecated)] + let req = RespondWorkflowTaskCompletedRequest { + task_token: request.task_token.into(), + commands: request.commands, + messages: request.messages, + identity: self.client_config.identity.clone(), + sticky_attributes: request.sticky_attributes, + return_new_workflow_task: request.return_new_workflow_task, + force_create_new_workflow_task: request.force_create_new_workflow_task, + worker_version_stamp: None, // deprecated + binary_checksum: "".to_string(), // deprecated + query_results: request + .query_responses + .into_iter() + .map(|qr| { + let (id, completed_type, query_result, error_message) = qr.into_components(); + ( + id, + WorkflowQueryResult { + result_type: completed_type as i32, + answer: query_result, + error_message, + failure: None, // TODO: https://github.com/temporalio/sdk-core/issues/867 + }, + ) + }) + .collect(), + namespace: self.worker_config.namespace.to_string(), + sdk_metadata: Some(request.sdk_metadata), + metering_metadata: Some(request.metering_metadata), + capabilities: Some(respond_workflow_task_completed_request::Capabilities { + discard_speculative_workflow_task_with_events: true, + }), + deployment: None, // deprecated + versioning_behavior: request.versioning_behavior.into(), + deployment_options: Some(self.get_deployment_opts()), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.complete_workflow_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ) + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn complete_activity_task( + &self, + task_token: TaskToken, + result: Option, + ) -> Result { + #[allow(deprecated)] + let req = RespondActivityTaskCompletedRequest { + task_token: task_token.0, + result, + identity: self.client_config.identity.clone(), + namespace: self.worker_config.namespace.to_string(), + worker_version: None, // deprecated + deployment: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.complete_activity_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn complete_nexus_task( + &self, + task_token: TaskToken, + response: nexus::v1::Response, + ) -> Result { + let req = RespondNexusTaskCompletedRequest { + namespace: self.worker_config.namespace.to_string(), + identity: self.client_config.identity.clone(), + task_token: task_token.0, + response: Some(response), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.complete_nexus_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn record_activity_heartbeat( + &self, + task_token: TaskToken, + details: Option, + ) -> Result { + let req = RecordActivityTaskHeartbeatRequest { + task_token: task_token.0, + details, + identity: self.client_config.identity.clone(), + namespace: self.worker_config.namespace.to_string(), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.record_activity_heartbeat)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + + async fn cancel_activity_task( + &self, + task_token: TaskToken, + details: Option, + ) -> Result { + #[allow(deprecated)] + let req = RespondActivityTaskCanceledRequest { + task_token: task_token.0, + details, + identity: self.client_config.identity.clone(), + namespace: self.worker_config.namespace.to_string(), + worker_version: None, // deprecated + deployment: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.cancel_activity_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn fail_activity_task( + &self, + task_token: TaskToken, + failure: Option, + ) -> Result { + #[allow(deprecated)] + let req = RespondActivityTaskFailedRequest { + task_token: task_token.0, + failure, + identity: self.client_config.identity.clone(), + namespace: self.worker_config.namespace.to_string(), + last_heartbeat_details: None, // TODO: https://github.com/temporalio/sdk-core/issues/293 + worker_version: None, // deprecated + deployment: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.fail_activity_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ) + } + + let (succ, fail) = rx + .await + .map_err(|_| tonic::Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn fail_workflow_task( + &self, + task_token: TaskToken, + cause: WorkflowTaskFailedCause, + failure: Option, + ) -> Result { + #[allow(deprecated)] + let req = RespondWorkflowTaskFailedRequest { + task_token: task_token.0, + cause: cause as i32, + failure, + identity: self.client_config.identity.clone(), + binary_checksum: "".to_string(), // deprecated + namespace: self.worker_config.namespace.to_string(), + messages: vec![], + worker_version: None, // deprecated + deployment: None, // deprecated + deployment_options: Some(self.get_deployment_opts()), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.fail_workflow_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ) + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn fail_nexus_task( + &self, + task_token: TaskToken, + error: nexus::v1::HandlerError, + ) -> Result { + let req = RespondNexusTaskFailedRequest { + namespace: self.worker_config.namespace.to_string(), + identity: self.client_config.identity.clone(), + task_token: task_token.0, + error: Some(error), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.fail_nexus_task)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn get_workflow_execution_history( + &self, + workflow_id: String, + run_id: Option, + page_token: Vec, + ) -> Result { + let req = GetWorkflowExecutionHistoryRequest { + namespace: self.worker_config.namespace.to_string(), + execution: Some(WorkflowExecution { + workflow_id, + run_id: run_id.unwrap_or_default(), + }), + next_page_token: page_token, + ..Default::default() + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.get_workflow_execution_history)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn respond_legacy_query( + &self, + task_token: TaskToken, + query_result: LegacyQueryResult, + ) -> Result { + let mut failure = None; + let (query_result, cause) = match query_result { + LegacyQueryResult::Succeeded(s) => (s, WorkflowTaskFailedCause::Unspecified), + #[allow(deprecated)] + LegacyQueryResult::Failed(f) => { + let cause = f.force_cause(); + failure = f.failure.clone(); + (legacy_query_failure(f), cause) + } + }; + let (_, completed_type, query_result, error_message) = query_result.into_components(); + + let req = RespondQueryTaskCompletedRequest { + task_token: task_token.into(), + completed_type: completed_type as i32, + query_result, + error_message, + namespace: self.worker_config.namespace.to_string(), + failure, + cause: cause.into(), + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.respond_legacy_query)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn describe_namespace(&self) -> Result { + let req = Namespace::Name(self.worker_config.namespace.to_string()).into_describe_namespace_request(); + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.describe_namespace)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + async fn shutdown_worker( + &self, + sticky_task_queue: String, + ) -> Result { + let req = ShutdownWorkerRequest { + namespace: self.worker_config.namespace.to_string(), + identity: self.client_config.identity.clone(), + sticky_task_queue, + reason: "graceful shutdown".to_string(), + worker_heartbeat: None, + }; + + let req_vec = req.encode_to_vec(); + let req_ref = ByteArrayRef { data: req_vec.as_ptr(), size: req_vec.len() }; + + let (tx, rx) = tokio::sync::oneshot::channel::<(ByteArray, ByteArray)>(); + let cb_sender = Box::into_raw(Box::new(tx)) as *mut libc::c_void; + + unsafe { + (self.inner.shutdown_worker)( + self.inner.user_data, + req_ref, + worker_client_callback_trampoline, + cb_sender, + ); + } + + let (succ, fail) = rx + .await + .map_err(|_| Status::internal("Interop trampoline never called back"))?; + + CallbackBasedWorkerClient::decode_callback_response(succ, fail) + } + + fn replace_client(&self, _: RetryClient) { + unimplemented!("Replacing client not supported for CallbackBasedWorkerClient"); + } + + fn capabilities(&self) -> Option { + self.capabilities + } + + fn workers(&self) -> Arc { + self.workers.clone() + } + + fn is_mock(&self) -> bool { + // Call sync closure + unsafe { (self.inner.is_mock)(self.inner.user_data) } + } + + fn sdk_name_and_version(&self) -> (String, String) { + (self.client_config.client_name.clone(), self.client_config.client_version.clone()) + } +} + +type Result = std::result::Result; \ No newline at end of file diff --git a/core-c-bridge/src/callback_based_worker_client/conversions.rs b/core-c-bridge/src/callback_based_worker_client/conversions.rs new file mode 100644 index 000000000..1c5867a29 --- /dev/null +++ b/core-c-bridge/src/callback_based_worker_client/conversions.rs @@ -0,0 +1,127 @@ +use std::ffi::c_void; +use tonic::{Code, Status}; + +use temporal_sdk_core::{ + PollActivityOptions, + PollOptions, + PollWorkflowOptions, +}; + +use crate::ByteArrayRef; + +use crate::callback_based_worker_client::interop_types::{ + CallbackBasedWorkerClientGRPCCode, + CallbackBasedWorkerClientNoRetryPredicate, + CallbackBasedWorkerClientPollActivityOptions, + CallbackBasedWorkerClientPollOptions, + CallbackBasedWorkerClientPollWorkflowOptions, + CallbackBasedWorkerClientConfig +}; +use crate::callback_based_worker_client::client_impl::WorkerClientConfig; + +/// Convert interop [CallbackBasedWorkerClientConfig] to [WorkerClientConfig] +impl From<&CallbackBasedWorkerClientConfig> for WorkerClientConfig { + fn from(config: &CallbackBasedWorkerClientConfig) -> Self { + WorkerClientConfig { + client_name: config.client_name.to_string(), + client_version: config.client_version.to_string(), + identity: config.identity.to_string(), + } + } +} + +// Convert Rust Core SDK options → interop structs +impl From<&PollOptions> for CallbackBasedWorkerClientPollOptions { + fn from(o: &PollOptions) -> Self { + let task_queue = ByteArrayRef::from(o.task_queue.as_str()); + + // timeout + let (has_timeout_ms, timeout_ms) = o.timeout_override + .map(|d| (true, d.as_millis() as u64)) + .unwrap_or((false, 0)); + + // retry predicate + let (has_no_retry, no_retry_predicate, no_retry_user_data) = if let Some(nr) = &o.no_retry { + extern "C" fn thunk(user_data: *mut c_void, code: CallbackBasedWorkerClientGRPCCode) -> bool { + // reconstruct the Rust predicate + let f: fn(&Status) -> bool = unsafe { std::mem::transmute(user_data) }; + let st = Status::new(Code::from(code), ""); + f(&st) + } + let fptr: fn(&Status) -> bool = nr.predicate; + (true, thunk as CallbackBasedWorkerClientNoRetryPredicate, fptr as *mut _) + } else { + (false, dummy_predicate as CallbackBasedWorkerClientNoRetryPredicate, std::ptr::null_mut()) + }; + + CallbackBasedWorkerClientPollOptions { + task_queue, + has_no_retry, + no_retry_predicate, + no_retry_user_data, + has_timeout_ms, + timeout_ms, + } + } +} + +unsafe extern "C" fn dummy_predicate(_: *mut c_void, _: CallbackBasedWorkerClientGRPCCode) -> bool { + false +} + +impl From<&PollWorkflowOptions> for CallbackBasedWorkerClientPollWorkflowOptions { + fn from(o: &PollWorkflowOptions) -> Self { + if let Some(name) = &o.sticky_queue_name { + CallbackBasedWorkerClientPollWorkflowOptions { + has_sticky_queue_name: true, + sticky_queue_name: ByteArrayRef::from(name.as_str()), + } + } else { + CallbackBasedWorkerClientPollWorkflowOptions { + has_sticky_queue_name: false, + sticky_queue_name: ByteArrayRef::empty(), + } + } + } +} + +impl From<&PollActivityOptions> for CallbackBasedWorkerClientPollActivityOptions { + fn from(o: &PollActivityOptions) -> Self { + if let Some(val) = o.max_tasks_per_sec { + CallbackBasedWorkerClientPollActivityOptions { + has_max_tasks_per_sec: true, + max_tasks_per_sec: val, + } + } else { + CallbackBasedWorkerClientPollActivityOptions { + has_max_tasks_per_sec: false, + max_tasks_per_sec: 0.0, + } + } + } +} + +// Convert Interop structs → Rust Tonic Code struct +impl From for Code { + fn from(c: CallbackBasedWorkerClientGRPCCode) -> Self { + match c { + CallbackBasedWorkerClientGRPCCode::Ok => Code::Ok, + CallbackBasedWorkerClientGRPCCode::Cancelled => Code::Cancelled, + CallbackBasedWorkerClientGRPCCode::Unknown => Code::Unknown, + CallbackBasedWorkerClientGRPCCode::InvalidArgument => Code::InvalidArgument, + CallbackBasedWorkerClientGRPCCode::DeadlineExceeded => Code::DeadlineExceeded, + CallbackBasedWorkerClientGRPCCode::NotFound => Code::NotFound, + CallbackBasedWorkerClientGRPCCode::AlreadyExists => Code::AlreadyExists, + CallbackBasedWorkerClientGRPCCode::PermissionDenied => Code::PermissionDenied, + CallbackBasedWorkerClientGRPCCode::ResourceExhausted => Code::ResourceExhausted, + CallbackBasedWorkerClientGRPCCode::FailedPrecondition => Code::FailedPrecondition, + CallbackBasedWorkerClientGRPCCode::Aborted => Code::Aborted, + CallbackBasedWorkerClientGRPCCode::OutOfRange => Code::OutOfRange, + CallbackBasedWorkerClientGRPCCode::Unimplemented => Code::Unimplemented, + CallbackBasedWorkerClientGRPCCode::Internal => Code::Internal, + CallbackBasedWorkerClientGRPCCode::Unavailable => Code::Unavailable, + CallbackBasedWorkerClientGRPCCode::DataLoss => Code::DataLoss, + CallbackBasedWorkerClientGRPCCode::Unauthenticated => Code::Unauthenticated, + } + } +} \ No newline at end of file diff --git a/core-c-bridge/src/callback_based_worker_client/interop.rs b/core-c-bridge/src/callback_based_worker_client/interop.rs new file mode 100644 index 000000000..5544945fe --- /dev/null +++ b/core-c-bridge/src/callback_based_worker_client/interop.rs @@ -0,0 +1,264 @@ +use libc::c_void; +use prost::Message; +use temporal_client::SlotManager; +use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{get_system_info_response, GetSystemInfoResponse}; +use uuid::Uuid; +use std::sync::Arc; + +use crate::{ + ByteArrayRef, + enter_sync, + runtime::Runtime, + worker::WorkerOptions, + worker::WorkerOrFail, + worker::Worker +}; + +use crate::callback_based_worker_client::client_impl::{CallbackBasedWorkerClient, WorkerClientConfig}; +use crate::callback_based_worker_client::interop_types::{ + CallbackBasedWorkerClientPollOptions, + CallbackBasedWorkerClientConfig +}; +use crate::callback_based_worker_client::trampoline::WorkerClientCallbackTrampoline; + +/// Interop function pointer table implementing the RPC logic of the [temporal_sdk_core::WorkerClient] trait via callbacks. +#[repr(C)] +#[derive(Clone, Copy)] +pub struct CallbackBasedWorkerClientRPCs { + pub user_data: *mut c_void, + + pub poll_workflow_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + poll_opts: *const CallbackBasedWorkerClientPollOptions, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub poll_activity_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + poll_opts: *const CallbackBasedWorkerClientPollOptions, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub poll_nexus_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + poll_opts: *const CallbackBasedWorkerClientPollOptions, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub complete_workflow_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub complete_activity_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub complete_nexus_task: unsafe extern "C" fn( + user_data: *mut c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut c_void, + ), + + pub record_activity_heartbeat: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub cancel_activity_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub fail_activity_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub fail_workflow_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub fail_nexus_task: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub get_workflow_execution_history: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub respond_legacy_query: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub describe_namespace: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_ud: *mut libc::c_void, + ), + + pub shutdown_worker: unsafe extern "C" fn( + user_data: *mut libc::c_void, + request: ByteArrayRef, + callback: WorkerClientCallbackTrampoline, + callback_user_data: *mut libc::c_void, + ), + + pub is_mock: unsafe extern "C" fn( + user_data: *mut libc::c_void, + ) -> bool, + + // The [temporal_sdk_core::WorkerClient] `workers()`, `capabilities()`, and `sdk_name_and_version()` functions are purely implemented in Rust, no interop implementation is offered. + // The [temporal_sdk_core::WorkerClient] `replace_client()` function is not supported by the [CallbackBasedWorkerClient]. +} + +unsafe impl Send for CallbackBasedWorkerClientRPCs {} +unsafe impl Sync for CallbackBasedWorkerClientRPCs {} + +/// Create a new [Worker] with a custom callback-backed [temporal_sdk_core::WorkerClient] implementation. +/// +/// # Safety +/// - All pointer args (`client_rpcs`, `options`, `client_config`, `runtime`) must be non-null and valid. +/// - `system_info_ref.data` must point to at least `system_info_ref.size` bytes if non-zero. +/// - Caller retains ownership; these must outlive the call. +/// +/// # Params +/// - `client_rpcs`: pointer to [CallbackBasedWorkerClientRPCs] implementing Core SDK [temporal_sdk_core::WorkerClient] RPCs. +/// - `options`: pointer to [WorkerOptions]. +/// - `client_config`: pointer to [CallbackBasedWorkerClientConfig] (client name, version, identity). +/// - `system_info_ref`: zero-copy [GetSystemInfoResponse] bytes or empty. +/// - `runtime`: pointer to [Runtime]. +/// +/// # Returns +/// A [WorkerOrFail] struct containing: +/// - `worker`: on success, a heap-allocated [Worker] pointer; null on error. +/// - `fail`: on error, a C string (UTF-8) describing the failure; null on success. +#[unsafe(no_mangle)] +pub extern "C" fn temporal_core_new_callback_based_worker_client( + client_rpcs: *const CallbackBasedWorkerClientRPCs, + options: *const WorkerOptions, + client_config: *const CallbackBasedWorkerClientConfig, + system_info_ref: ByteArrayRef, + runtime: *mut Runtime, +) -> WorkerOrFail { + // Caller must guarantee non-null, valid pointers + let (client_rpcs, options, client_config, runtime) = unsafe { + ( + *client_rpcs, + &*options, + &*client_config, + &mut *runtime + ) + }; + + let client_config: WorkerClientConfig = client_config.into(); + + // Create unique sticky queue name for a worker, iff the config allows for 1 or more cached workflows + let sticky_queue_name = if options.max_cached_workflows > 0 { + Some(format!( + "{}-{}", + client_config.identity, + Uuid::new_v4().simple() + )) + } else { + None + }; + + // Decode optional system_info zero-copy and return capabilities + let capabilities: Option = if system_info_ref.size == 0 { + None + } else { + let buf = unsafe { + std::slice::from_raw_parts( + system_info_ref.data, + system_info_ref.size, + ) + }; + let resp = match GetSystemInfoResponse::decode(buf) { + Ok(resp) => resp, + Err(err) => { + let err_ptr = runtime + .alloc_utf8(&format!("GetSystemInfoResponse decode error: {}", err)) + .into_raw() + .cast_const(); + return WorkerOrFail { + worker: std::ptr::null_mut(), + fail: err_ptr, + }; + } + }; + resp.capabilities + }; + + enter_sync!(runtime); + + // Convert interop options to Core SDK [temporal_sdk_core::WorkerConfig] + let worker_config: temporal_sdk_core::WorkerConfig = match options.try_into() { + Err(err) => { + let err_ptr = runtime + .alloc_utf8(&format!("Invalid options: {}", err)) + .into_raw() + .cast_const(); + return WorkerOrFail { + worker: std::ptr::null_mut(), + fail: err_ptr, + }; + } + Ok(cfg) => cfg, + }; + + // Create the Core SDK [temporal_sdk_core::Worker] + let core_worker = temporal_sdk_core::Worker::new( + worker_config.clone(), + sticky_queue_name, + Arc::new(CallbackBasedWorkerClient { + inner: client_rpcs, // RPCs implemented via the [CallbackBasedWorkerClientRPCs] + worker_config: worker_config.clone(), + client_config: client_config, + capabilities: capabilities, + workers: Arc::new(SlotManager::new()) + }), + None, // no telemetry + ); + + // Wrap it in our Bridge Worker and heap‐allocate + let bridge_worker = Worker { + worker: Some(Arc::new(core_worker)), + runtime: runtime.clone(), + }; + WorkerOrFail { + worker: Box::into_raw(Box::new(bridge_worker)), + fail: std::ptr::null_mut(), + } +} \ No newline at end of file diff --git a/core-c-bridge/src/callback_based_worker_client/interop_types.rs b/core-c-bridge/src/callback_based_worker_client/interop_types.rs new file mode 100644 index 000000000..ca5f77604 --- /dev/null +++ b/core-c-bridge/src/callback_based_worker_client/interop_types.rs @@ -0,0 +1,78 @@ +use std::ffi::c_void; +use crate::ByteArrayRef; + +/// Interop configuration of the [WorkerClientConfig]. +#[repr(C)] +pub struct CallbackBasedWorkerClientConfig { + pub client_name: ByteArrayRef, + pub client_version: ByteArrayRef, + pub identity: ByteArrayRef +} + +/// Interop mirror of [temporal_sdk_core::PollOptions] +#[repr(C)] +pub struct CallbackBasedWorkerClientPollOptions { + /// always non-null (could be empty) + pub task_queue: ByteArrayRef, + + /// true if `no_retry_predicate` is valid + pub has_no_retry: bool, + /// Decide short‐circuit GRPC error + pub no_retry_predicate: CallbackBasedWorkerClientNoRetryPredicate, + /// Opaque context pointer passed back to your predicate thunk + pub no_retry_user_data: *mut c_void, + + /// true if `timeout_override.is_some()` + pub has_timeout_ms: bool, + /// timeout in milliseconds + pub timeout_ms: u64, +} + +unsafe impl Send for CallbackBasedWorkerClientPollOptions {} +unsafe impl Sync for CallbackBasedWorkerClientPollOptions {} + +/// Interop mirror of [temporal_sdk_core::PollWorkflowOptions] +#[repr(C)] +pub struct CallbackBasedWorkerClientPollWorkflowOptions { + /// true if `sticky_queue_name.is_some()` + pub has_sticky_queue_name: bool, + /// the inner string (only valid if above true) + pub sticky_queue_name: ByteArrayRef, +} + +unsafe impl Send for CallbackBasedWorkerClientPollWorkflowOptions {} +unsafe impl Sync for CallbackBasedWorkerClientPollWorkflowOptions {} + +/// Interop mirror of [temporal_sdk_core::PollActivityOptions] +#[repr(C)] +pub struct CallbackBasedWorkerClientPollActivityOptions { + /// true if `max_tasks_per_sec.is_some()` + pub has_max_tasks_per_sec: bool, + /// the inner `f64` value + pub max_tasks_per_sec: f64, +} + +/// Interop mirror of the Rust tonic [tonic::Code] enum +#[repr(C)] +pub enum CallbackBasedWorkerClientGRPCCode { + Ok = 0, + Cancelled = 1, + Unknown = 2, + InvalidArgument = 3, + DeadlineExceeded = 4, + NotFound = 5, + AlreadyExists = 6, + PermissionDenied = 7, + ResourceExhausted = 8, + FailedPrecondition = 9, + Aborted = 10, + OutOfRange = 11, + Unimplemented = 12, + Internal = 13, + Unavailable = 14, + DataLoss = 15, + Unauthenticated = 16, +} + +/// Interop function pointer signature of the retry predicate of [CallbackBasedWorkerClientPollOptions] +pub type CallbackBasedWorkerClientNoRetryPredicate = unsafe extern "C" fn(user_data: *mut c_void, code: CallbackBasedWorkerClientGRPCCode) -> bool; \ No newline at end of file diff --git a/core-c-bridge/src/callback_based_worker_client/mod.rs b/core-c-bridge/src/callback_based_worker_client/mod.rs new file mode 100644 index 000000000..988420b90 --- /dev/null +++ b/core-c-bridge/src/callback_based_worker_client/mod.rs @@ -0,0 +1,7 @@ +mod conversions; +mod client_impl; +mod interop; +mod interop_types; +mod trampoline; + +pub use interop::{temporal_core_new_callback_based_worker_client, CallbackBasedWorkerClientRPCs}; \ No newline at end of file diff --git a/core-c-bridge/src/callback_based_worker_client/trampoline.rs b/core-c-bridge/src/callback_based_worker_client/trampoline.rs new file mode 100644 index 000000000..1ef88667d --- /dev/null +++ b/core-c-bridge/src/callback_based_worker_client/trampoline.rs @@ -0,0 +1,58 @@ +use crate::{ByteArrayRef, ByteArray}; + +/// Function pointer for RPC callbacks delivering a `(success, failure)` +/// pair as non-owning `ByteArrayRef`s to Rust. +pub(crate) type WorkerClientCallbackTrampoline = unsafe extern "C" fn( + user_data: *mut libc::c_void, + success: ByteArrayRef, + failure: ByteArrayRef, +); + +/// Interop-triggered callback trampoline. +/// +/// When an interop caller completes an RPC, it invokes this function +/// with two non-owning `ByteArrayRef`s (`success_ref` and `failure_ref`) plus +/// a raw `callback_ud` token that was originally created as +/// `Box::into_raw(Box::new(tx))`. +/// +/// This shim: +/// 1. Reclaims the boxed `oneshot::Sender<(ByteArray, ByteArray)>` from `callback_ud`. +/// 2. Deep-copies each `ByteArrayRef` into an owned `ByteArray`. +/// 3. Sends the `(success, failure)` pair through the channel, waking the awaiting task. +/// +/// # Safety +/// - `callback_ud` must come from `Box::into_raw(Box::new(tx))`. +/// - `success_ref.data` and `failure_ref.data` must point to `size` valid bytes. +pub(crate) unsafe extern "C" fn worker_client_callback_trampoline( + callback_ud: *mut libc::c_void, + success_ref: ByteArrayRef, + failure_ref: ByteArrayRef, +) { + // Recover our oneshot sender + let tx: Box> = + unsafe { Box::from_raw(callback_ud as *mut _) }; + + // Deep‐copy a ByteArrayRef into an owning ByteArray + fn into_owned(r: ByteArrayRef) -> ByteArray { + if r.data.is_null() { // must not check for size == 0 here, there could be empty, valid responses from Temporal + // no data → empty, non‐freeing ByteArray + ByteArray { + data: std::ptr::null(), + size: 0, + cap: 0, + disable_free: true, // disable free as otherwise leads to double-free from Oneshot Channel and drop after `worker_client_callback_trampoline` + } + } else { + // SAFETY: the interop side promises `r.data` is valid for `r.size` bytes + let slice: &[u8] = unsafe { std::slice::from_raw_parts(r.data, r.size) }; + let vec = slice.to_vec(); // deep‐copy + ByteArray::from_vec_disable_free(vec) // disable free as otherwise leads to double-free from Oneshot Channel and drop after `worker_client_callback_trampoline` + } + } + + let succ = into_owned(success_ref); + let fail = into_owned(failure_ref); + + // Send the pair back down the channel + let _ = tx.send((succ, fail)); +} \ No newline at end of file diff --git a/core-c-bridge/src/lib.rs b/core-c-bridge/src/lib.rs index a4427f3b0..df5c2eb74 100644 --- a/core-c-bridge/src/lib.rs +++ b/core-c-bridge/src/lib.rs @@ -202,7 +202,7 @@ impl Drop for ByteArray { // drop-but-not-freed situation though we don't expect any), the bytes // remain non-null so we re-own them here. See "byte_array_free" in // runtime.rs. - if !self.data.is_null() { + if !self.data.is_null() && !self.disable_free { unsafe { Vec::from_raw_parts(self.data as *mut u8, self.size, self.cap) }; } } diff --git a/core-c-bridge/src/worker.rs b/core-c-bridge/src/worker.rs index 024dcb68e..8c6133bca 100644 --- a/core-c-bridge/src/worker.rs +++ b/core-c-bridge/src/worker.rs @@ -391,8 +391,8 @@ pub struct ResourceBasedTunerOptions { #[derive(Clone)] pub struct Worker { - worker: Option>, - runtime: Runtime, + pub(crate) worker: Option>, + pub(crate) runtime: Runtime, } /// Only runtime or fail will be non-null. Whichever is must be freed when done. @@ -418,6 +418,7 @@ pub struct WorkerReplayPushResult { pub fail: *const ByteArray, } +#[macro_export] macro_rules! enter_sync { ($runtime:expr) => { if let Some(subscriber) = $runtime.core.telemetry().trace_subscriber() { diff --git a/core/src/lib.rs b/core/src/lib.rs index 8e43bf82c..84501101e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -19,10 +19,12 @@ pub mod ephemeral_server; mod internal_flags; mod pollers; mod protosext; +pub use protosext::legacy_query_failure; pub mod replay; pub(crate) mod retry_logic; pub mod telemetry; mod worker; +pub use worker::client::LegacyQueryResult; #[cfg(test)] mod core_tests; diff --git a/core/src/protosext/mod.rs b/core/src/protosext/mod.rs index 8f67c7486..131265537 100644 --- a/core/src/protosext/mod.rs +++ b/core/src/protosext/mod.rs @@ -153,7 +153,7 @@ impl WorkflowActivationExt for WorkflowActivation { } /// Create a legacy query failure result -pub(crate) fn legacy_query_failure(fail: workflow_completion::Failure) -> QueryResult { +pub fn legacy_query_failure(fail: workflow_completion::Failure) -> QueryResult { QueryResult { query_id: LEGACY_QUERY_ID.to_string(), variant: Some(query_result::Variant::Failed( diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index 2b401e204..7d1afea66 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -35,8 +35,11 @@ use tonic::IntoRequest; type Result = std::result::Result; +/// Legacy-version of [QueryResult] containing a seperate failure case pub enum LegacyQueryResult { + /// Success case containing the [QueryResult] Succeeded(QueryResult), + /// Failure case containing the [workflow_completion::Failure] Failed(workflow_completion::Failure), }