-
Notifications
You must be signed in to change notification settings - Fork 90
Introduce a new callback based bridge worker #955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,6 +5,29 @@ | |||||||||||||
#include <stdint.h> | ||||||||||||||
#include <stdlib.h> | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Interop mirror of the Rust tonic [tonic::Code] enum | ||||||||||||||
*/ | ||||||||||||||
typedef enum TemporalCoreCallbackBasedWorkerClientGRPCCode { | ||||||||||||||
Ok = 0, | ||||||||||||||
Cancelled = 1, | ||||||||||||||
Unknown = 2, | ||||||||||||||
InvalidArgument = 3, | ||||||||||||||
DeadlineExceeded = 4, | ||||||||||||||
NotFound = 5, | ||||||||||||||
AlreadyExists = 6, | ||||||||||||||
PermissionDenied = 7, | ||||||||||||||
ResourceExhausted = 8, | ||||||||||||||
FailedPrecondition = 9, | ||||||||||||||
Aborted = 10, | ||||||||||||||
OutOfRange = 11, | ||||||||||||||
Unimplemented = 12, | ||||||||||||||
Internal = 13, | ||||||||||||||
Unavailable = 14, | ||||||||||||||
DataLoss = 15, | ||||||||||||||
Unauthenticated = 16, | ||||||||||||||
} TemporalCoreCallbackBasedWorkerClientGRPCCode; | ||||||||||||||
|
||||||||||||||
typedef enum TemporalCoreForwardedLogLevel { | ||||||||||||||
Trace = 0, | ||||||||||||||
Debug, | ||||||||||||||
|
@@ -629,6 +652,130 @@ typedef struct TemporalCoreWorkerReplayPushResult { | |||||||||||||
const struct TemporalCoreByteArray *fail; | ||||||||||||||
} TemporalCoreWorkerReplayPushResult; | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Interop function pointer signature of the retry predicate of [CallbackBasedWorkerClientPollOptions] | ||||||||||||||
*/ | ||||||||||||||
typedef bool (*TemporalCoreCallbackBasedWorkerClientNoRetryPredicate)(void *user_data, | ||||||||||||||
enum TemporalCoreCallbackBasedWorkerClientGRPCCode code); | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Interop mirror of [temporal_sdk_core::PollOptions] | ||||||||||||||
*/ | ||||||||||||||
typedef struct TemporalCoreCallbackBasedWorkerClientPollOptions { | ||||||||||||||
/** | ||||||||||||||
* always non-null (could be empty) | ||||||||||||||
*/ | ||||||||||||||
struct TemporalCoreByteArrayRef task_queue; | ||||||||||||||
/** | ||||||||||||||
* true if `no_retry_predicate` is valid | ||||||||||||||
*/ | ||||||||||||||
bool has_no_retry; | ||||||||||||||
/** | ||||||||||||||
* Decide short‐circuit GRPC error | ||||||||||||||
*/ | ||||||||||||||
TemporalCoreCallbackBasedWorkerClientNoRetryPredicate no_retry_predicate; | ||||||||||||||
/** | ||||||||||||||
* Opaque context pointer passed back to your predicate thunk | ||||||||||||||
*/ | ||||||||||||||
void *no_retry_user_data; | ||||||||||||||
/** | ||||||||||||||
* true if `timeout_override.is_some()` | ||||||||||||||
*/ | ||||||||||||||
bool has_timeout_ms; | ||||||||||||||
/** | ||||||||||||||
* timeout in milliseconds | ||||||||||||||
*/ | ||||||||||||||
uint64_t timeout_ms; | ||||||||||||||
} TemporalCoreCallbackBasedWorkerClientPollOptions; | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Function pointer for RPC callbacks delivering a `(success, failure)` | ||||||||||||||
* pair as non-owning `ByteArrayRef`s to Rust. | ||||||||||||||
*/ | ||||||||||||||
typedef void (*TemporalCoreWorkerClientCallbackTrampoline)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef success, | ||||||||||||||
struct TemporalCoreByteArrayRef failure); | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Interop function pointer table implementing the RPC logic of the [temporal_sdk_core::WorkerClient] trait via callbacks. | ||||||||||||||
*/ | ||||||||||||||
typedef struct TemporalCoreCallbackBasedWorkerClientRPCs { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, thinking on this, I wonder if we should just have a general lower-level callback-based client and nothing specific to workers. So basically a C abstraction for gRPC client. So basically have a I know we had mentioned this approach at #883, but instead at #924 we're exposing the single worker client trait. I think it'd be wiser to make a C bridge for gRPC client in general. This would allow our SDKs to let people just replace the gRPC implementation at client creation time instead of worker creation time. It would also allow our SDKs to provide gRPC interception if they wanted (by delegating to another client), though we could make a bit of sugar to make delegating to the default implementation easier at that time. Thoughts @FranzBusch and @Sushisource? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I generally agree. I think it's my bad for forgetting that we wanted to do that originally. Because we had talked about it before. I think the main issue is just that it's a decent chunk of refactoring I think. Plumbing through the underlying generic thing will require touching a bunch of spots in the client crate, and allowing plugging in a https://docs.rs/tower-service/0.3.3/tower_service/trait.Service.html (I believe) into the connection code here Line 435 in 42cc51a
The type-level tomfoolery is probably significant. So, honestly, I'm kinda fine with this for now if it allows us to proceed. Alternatively, @FranzBusch, if you're not in a massive rush, and you're interested in taking a crack at that, that would be fantastic. Otherwise I'm not sure we'd have time to fit that in super soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent some time looking to see if we can drop the callback based substitution one layer down to general I would love to proceed with what @Sushisource suggested here and proceed with the strategy of this PR and we can track the alternative approach in an issue for further down the road. If you all are fine with this I will tackle the inline feedback on this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hrmm, if it means we can reduce this bridge complexity and increase benefits for other things, I may be able to carve off time to see if I can confirm/deny replacing the gRPC call with a C call in clients is not too bad. May be able to replace the tower service in some way. At least for me, if I can see it is non-trivial, it would make it easier to justify this worker-only approach. Regardless, if I can't confirm/deny in a timely manner, agreed we can move forward on this. |
||||||||||||||
void *user_data; | ||||||||||||||
void (*poll_workflow_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are poll options expected to be used? |
||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
Comment on lines
+706
to
+708
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Pedantic, but would like not to abbreviate things that are exposed in the header |
||||||||||||||
void (*poll_activity_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*poll_nexus_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
const struct TemporalCoreCallbackBasedWorkerClientPollOptions *poll_opts, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*complete_workflow_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*complete_activity_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*complete_nexus_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*record_activity_heartbeat)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*cancel_activity_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*fail_activity_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*fail_workflow_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*fail_nexus_task)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*get_workflow_execution_history)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*respond_legacy_query)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*describe_namespace)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_ud); | ||||||||||||||
void (*shutdown_worker)(void *user_data, | ||||||||||||||
struct TemporalCoreByteArrayRef request, | ||||||||||||||
TemporalCoreWorkerClientCallbackTrampoline callback, | ||||||||||||||
void *callback_user_data); | ||||||||||||||
bool (*is_mock)(void *user_data); | ||||||||||||||
} TemporalCoreCallbackBasedWorkerClientRPCs; | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Interop configuration of the [WorkerClientConfig]. | ||||||||||||||
*/ | ||||||||||||||
typedef struct TemporalCoreCallbackBasedWorkerClientConfig { | ||||||||||||||
struct TemporalCoreByteArrayRef client_name; | ||||||||||||||
struct TemporalCoreByteArrayRef client_version; | ||||||||||||||
struct TemporalCoreByteArrayRef identity; | ||||||||||||||
} TemporalCoreCallbackBasedWorkerClientConfig; | ||||||||||||||
|
||||||||||||||
#ifdef __cplusplus | ||||||||||||||
extern "C" { | ||||||||||||||
#endif // __cplusplus | ||||||||||||||
|
@@ -812,6 +959,32 @@ void temporal_core_complete_async_reserve(void *sender, uintptr_t permit_id); | |||||||||||||
void temporal_core_set_reserve_cancel_target(struct TemporalCoreSlotReserveCtx *ctx, | ||||||||||||||
void *token_ptr); | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* Create a new [Worker] with a custom callback-backed [temporal_sdk_core::WorkerClient] implementation. | ||||||||||||||
* | ||||||||||||||
* # Safety | ||||||||||||||
* - All pointer args (`client_rpcs`, `options`, `client_config`, `runtime`) must be non-null and valid. | ||||||||||||||
* - `system_info_ref.data` must point to at least `system_info_ref.size` bytes if non-zero. | ||||||||||||||
* - Caller retains ownership; these must outlive the call. | ||||||||||||||
* | ||||||||||||||
* # Params | ||||||||||||||
* - `client_rpcs`: pointer to [CallbackBasedWorkerClientRPCs] implementing Core SDK [temporal_sdk_core::WorkerClient] RPCs. | ||||||||||||||
* - `options`: pointer to [WorkerOptions]. | ||||||||||||||
* - `client_config`: pointer to [CallbackBasedWorkerClientConfig] (client name, version, identity). | ||||||||||||||
* - `system_info_ref`: zero-copy [GetSystemInfoResponse] bytes or empty. | ||||||||||||||
* - `runtime`: pointer to [Runtime]. | ||||||||||||||
* | ||||||||||||||
* # Returns | ||||||||||||||
* A [WorkerOrFail] struct containing: | ||||||||||||||
* - `worker`: on success, a heap-allocated [Worker] pointer; null on error. | ||||||||||||||
* - `fail`: on error, a C string (UTF-8) describing the failure; null on success. | ||||||||||||||
*/ | ||||||||||||||
struct TemporalCoreWorkerOrFail temporal_core_new_callback_based_worker_client(const struct TemporalCoreCallbackBasedWorkerClientRPCs *client_rpcs, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
or
Suggested change
Pedantic, but worth being clear this is creating a worker not a client |
||||||||||||||
const struct TemporalCoreWorkerOptions *options, | ||||||||||||||
const struct TemporalCoreCallbackBasedWorkerClientConfig *client_config, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pedantic, but would be ok putting all other non-runtime parameters in this structure instead of as separate parameters. And call it |
||||||||||||||
struct TemporalCoreByteArrayRef system_info_ref, | ||||||||||||||
struct TemporalCoreRuntime *runtime); | ||||||||||||||
|
||||||||||||||
#ifdef __cplusplus | ||||||||||||||
} // extern "C" | ||||||||||||||
#endif // __cplusplus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pedantic, but we have not traditionally called this a trampoline in this C header, can it just be called a callback?