Skip to content

Introduce a new callback based bridge worker #955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core-c-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2024"

[lib]
name = "temporal_sdk_core_c_bridge"
crate-type = ["cdylib"]
crate-type = ["cdylib", "staticlib"]

[dependencies]
anyhow = "1.0"
Expand All @@ -24,6 +24,7 @@ tokio-util = "0.7"
tonic = { workspace = true }
tracing = "0.1"
url = "2.2"
uuid = { version = "1", features = ["v4"] }

[dependencies.temporal-client]
path = "../client"
Expand Down
173 changes: 173 additions & 0 deletions core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@
#include <stdint.h>
#include <stdlib.h>

/**
* Interop mirror of the Rust tonic [tonic::Code] enum
*/
typedef enum TemporalCoreCallbackBasedWorkerClientGRPCCode {
Ok = 0,
Cancelled = 1,
Unknown = 2,
InvalidArgument = 3,
DeadlineExceeded = 4,
NotFound = 5,
AlreadyExists = 6,
PermissionDenied = 7,
ResourceExhausted = 8,
FailedPrecondition = 9,
Aborted = 10,
OutOfRange = 11,
Unimplemented = 12,
Internal = 13,
Unavailable = 14,
DataLoss = 15,
Unauthenticated = 16,
} TemporalCoreCallbackBasedWorkerClientGRPCCode;

typedef enum TemporalCoreForwardedLogLevel {
Trace = 0,
Debug,
Expand Down Expand Up @@ -629,6 +652,130 @@ typedef struct TemporalCoreWorkerReplayPushResult {
const struct TemporalCoreByteArray *fail;
} TemporalCoreWorkerReplayPushResult;

/**
* Interop function pointer signature of the retry predicate of [CallbackBasedWorkerClientPollOptions]
*/
typedef bool (*TemporalCoreCallbackBasedWorkerClientNoRetryPredicate)(void *user_data,
enum TemporalCoreCallbackBasedWorkerClientGRPCCode code);

/**
* Interop mirror of [temporal_sdk_core::PollOptions]
*/
typedef struct TemporalCoreCallbackBasedWorkerClientPollOptions {
/**
* always non-null (could be empty)
*/
struct TemporalCoreByteArrayRef task_queue;
/**
* true if `no_retry_predicate` is valid
*/
bool has_no_retry;
/**
* Decide short‐circuit GRPC error
*/
TemporalCoreCallbackBasedWorkerClientNoRetryPredicate no_retry_predicate;
/**
* Opaque context pointer passed back to your predicate thunk
*/
void *no_retry_user_data;
/**
* true if `timeout_override.is_some()`
*/
bool has_timeout_ms;
/**
* timeout in milliseconds
*/
uint64_t timeout_ms;
} TemporalCoreCallbackBasedWorkerClientPollOptions;

/**
* Function pointer for RPC callbacks delivering a `(success, failure)`
* pair as non-owning `ByteArrayRef`s to Rust.
*/
typedef void (*TemporalCoreWorkerClientCallbackTrampoline)(void *user_data,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
typedef void (*TemporalCoreWorkerClientCallbackTrampoline)(void *user_data,
typedef void (*TemporalCoreWorkerClientResultCallback)(void *user_data,

Pedantic, but we have not traditionally called this a trampoline in this C header, can it just be called a callback?

struct TemporalCoreByteArrayRef success,
struct TemporalCoreByteArrayRef failure);

/**
* Interop function pointer table implementing the RPC logic of the [temporal_sdk_core::WorkerClient] trait via callbacks.
*/
typedef struct TemporalCoreCallbackBasedWorkerClientRPCs {
Copy link
Member

@cretz cretz Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, thinking on this, I wonder if we should just have a general lower-level callback-based client and nothing specific to workers. So basically a C abstraction for gRPC client.

So basically have a temporal_core_client_from_callbacks_new that accepts some options including the single function called back for every gRPC request (so RPC name is one of the params), and it returns a pointer to TemporalCoreClient (or rather a struct that has that or fail as mutually exclusive). Now that client can be used for temporal_core_worker_new can be passed to temporal_core_client_free, etc. The API would be cleaner/simpler and the implementer does not have to be aware of retry or worker details or any of that.

I know we had mentioned this approach at #883, but instead at #924 we're exposing the single worker client trait. I think it'd be wiser to make a C bridge for gRPC client in general. This would allow our SDKs to let people just replace the gRPC implementation at client creation time instead of worker creation time. It would also allow our SDKs to provide gRPC interception if they wanted (by delegating to another client), though we could make a bit of sugar to make delegating to the default implementation easier at that time.

Thoughts @FranzBusch and @Sushisource?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I generally agree. I think it's my bad for forgetting that we wanted to do that originally. Because we had talked about it before. I think the main issue is just that it's a decent chunk of refactoring I think. Plumbing through the underlying generic thing will require touching a bunch of spots in the client crate, and allowing plugging in a https://docs.rs/tower-service/0.3.3/tower_service/trait.Service.html (I believe) into the connection code here

let channel = Channel::from_shared(self.target_url.to_string())?;

The type-level tomfoolery is probably significant. So, honestly, I'm kinda fine with this for now if it allows us to proceed. Alternatively, @FranzBusch, if you're not in a massive rush, and you're interested in taking a crack at that, that would be fantastic. Otherwise I'm not sure we'd have time to fit that in super soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time looking to see if we can drop the callback based substitution one layer down to general Client level. Overall it seems like a good alternative approach but it looks like it would require a significant amount of refactoring to get us there. The approach I took here with just allowing the worker client to be substituted was significantly more straight forward since the protocol already existed. It was also enough for our use-case since I only cared about replacing the worker client and not the overall client.

I would love to proceed with what @Sushisource suggested here and proceed with the strategy of this PR and we can track the alternative approach in an issue for further down the road. If you all are fine with this I will tackle the inline feedback on this PR.

Copy link
Member

@cretz cretz Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, if it means we can reduce this bridge complexity and increase benefits for other things, I may be able to carve off time to see if I can confirm/deny replacing the gRPC call with a C call in clients is not too bad. May be able to replace the tower service in some way. At least for me, if I can see it is non-trivial, it would make it easier to justify this worker-only approach. Regardless, if I can't confirm/deny in a timely manner, agreed we can move forward on this.

void *user_data;
void (*poll_workflow_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are poll options expected to be used?

TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
Comment on lines +706 to +708
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_options,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_user_data);

Pedantic, but would like not to abbreviate things that are exposed in the header

void (*poll_activity_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*poll_nexus_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*complete_workflow_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*complete_activity_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*complete_nexus_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*record_activity_heartbeat)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*cancel_activity_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*fail_activity_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*fail_workflow_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*fail_nexus_task)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*get_workflow_execution_history)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*respond_legacy_query)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*describe_namespace)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_ud);
void (*shutdown_worker)(void *user_data,
struct TemporalCoreByteArrayRef request,
TemporalCoreWorkerClientCallbackTrampoline callback,
void *callback_user_data);
bool (*is_mock)(void *user_data);
} TemporalCoreCallbackBasedWorkerClientRPCs;

/**
* Interop configuration of the [WorkerClientConfig].
*/
typedef struct TemporalCoreCallbackBasedWorkerClientConfig {
struct TemporalCoreByteArrayRef client_name;
struct TemporalCoreByteArrayRef client_version;
struct TemporalCoreByteArrayRef identity;
} TemporalCoreCallbackBasedWorkerClientConfig;

#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
Expand Down Expand Up @@ -812,6 +959,32 @@ void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id);
void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx,
void *token_ptr);

/**
* Create a new [Worker] with a custom callback-backed [temporal_sdk_core::WorkerClient] implementation.
*
* # Safety
* - All pointer args (`client_rpcs`, `options`, `client_config`, `runtime`) must be non-null and valid.
* - `system_info_ref.data` must point to at least `system_info_ref.size` bytes if non-zero.
* - Caller retains ownership; these must outlive the call.
*
* # Params
* - `client_rpcs`: pointer to [CallbackBasedWorkerClientRPCs] implementing Core SDK [temporal_sdk_core::WorkerClient] RPCs.
* - `options`: pointer to [WorkerOptions].
* - `client_config`: pointer to [CallbackBasedWorkerClientConfig] (client name, version, identity).
* - `system_info_ref`: zero-copy [GetSystemInfoResponse] bytes or empty.
* - `runtime`: pointer to [Runtime].
*
* # Returns
* A [WorkerOrFail] struct containing:
* - `worker`: on success, a heap-allocated [Worker] pointer; null on error.
* - `fail`: on error, a C string (UTF-8) describing the failure; null on success.
*/
struct TemporalCoreWorkerOrFail temporal_core_new_callback_based_worker_client(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct TemporalCoreWorkerOrFail temporal_core_new_callback_based_worker_client(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs,
struct TemporalCoreWorkerOrFail temporal_core_callback_based_worker_new(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs,

or

Suggested change
struct TemporalCoreWorkerOrFail temporal_core_new_callback_based_worker_client(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs,
struct TemporalCoreWorkerOrFail temporal_core_worker_callback_based_new(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs,

Pedantic, but worth being clear this is creating a worker not a client

const struct TemporalCoreWorkerOptions *options,
const struct TemporalCoreCallbackBasedWorkerClientConfig *client_config,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedantic, but would be ok putting all other non-runtime parameters in this structure instead of as separate parameters. And call it TemporalCoreCallbackBasedWorkerOptions instead of Config. This is consistent with what we've done elsewhere I think. But not a big deal.

struct TemporalCoreByteArrayRef system_info_ref,
struct TemporalCoreRuntime *runtime);

#ifdef __cplusplus
} // extern "C"
#endif // __cplusplus
Loading
Loading