Skip to content

Commit 319fe2e

Browse files
authored
Update and signal with start (#201)
1 parent 7162d3e commit 319fe2e

16 files changed

+831
-18
lines changed

temporalio/.rubocop.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ Metrics/AbcSize:
4949
Metrics/BlockLength:
5050
Max: 100
5151

52+
# The default is too small
53+
Metrics/BlockNesting:
54+
Max: 5
55+
5256
# The default is too small
5357
Metrics/ClassLength:
5458
Max: 1000
@@ -59,7 +63,7 @@ Metrics/CyclomaticComplexity:
5963

6064
# The default is too small
6165
Metrics/MethodLength:
62-
Max: 100
66+
Max: 200
6367

6468
# The default is too small
6569
Metrics/ModuleLength:

temporalio/lib/temporalio/client.rb

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
require 'temporalio/client/interceptor'
99
require 'temporalio/client/schedule'
1010
require 'temporalio/client/schedule_handle'
11+
require 'temporalio/client/with_start_workflow_operation'
1112
require 'temporalio/client/workflow_execution'
1213
require 'temporalio/client/workflow_execution_count'
1314
require 'temporalio/client/workflow_handle'
1415
require 'temporalio/client/workflow_query_reject_condition'
16+
require 'temporalio/client/workflow_update_handle'
17+
require 'temporalio/client/workflow_update_wait_stage'
1518
require 'temporalio/common_enums'
1619
require 'temporalio/converters'
1720
require 'temporalio/error'
@@ -155,7 +158,7 @@ def initialize(
155158
default_workflow_query_reject_condition:
156159
).freeze
157160
# Initialize interceptors
158-
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int|
161+
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| # steep:ignore
159162
int.intercept_client(acc)
160163
end
161164
end
@@ -334,6 +337,106 @@ def workflow_handle(
334337
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
335338
end
336339

340+
# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
341+
# policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be
342+
# retrieved on the start workflow operation.
343+
#
344+
# @param update [Workflow::Definition::Update, Symbol, String] Update definition or name.
345+
# @param args [Array<Object>] Update arguments.
346+
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
347+
# have an `id_conflict_policy` set.
348+
# @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not
349+
# currently supported. See https://docs.temporal.io/workflows#update for more details.
350+
# @param id [String] ID of the update.
351+
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
352+
#
353+
# @return [WorkflowUpdateHandle] The update handle.
354+
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
355+
# @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't
356+
# mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start.
357+
# @raise [Error::RPCError] RPC error from call.
358+
def start_update_with_start_workflow(
359+
update,
360+
*args,
361+
start_workflow_operation:,
362+
wait_for_stage:,
363+
id: SecureRandom.uuid,
364+
rpc_options: nil
365+
)
366+
@impl.start_update_with_start_workflow(
367+
Interceptor::StartUpdateWithStartWorkflowInput.new(
368+
update_id: id,
369+
update:,
370+
args:,
371+
wait_for_stage:,
372+
start_workflow_operation:,
373+
headers: {},
374+
rpc_options:
375+
)
376+
)
377+
end
378+
379+
# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
380+
# policy), and wait for update result. This is a shortcut for {start_update_with_start_workflow} +
381+
# {WorkflowUpdateHandle.result}.
382+
#
383+
# @param update [Workflow::Definition::Update, Symbol, String] Update definition or name.
384+
# @param args [Array<Object>] Update arguments.
385+
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
386+
# have an `id_conflict_policy` set.
387+
# @param id [String] ID of the update.
388+
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
389+
#
390+
# @return [Object] Successful update result.
391+
# @raise [Error::WorkflowUpdateFailedError] If the update failed.
392+
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
393+
# @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't
394+
# mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start.
395+
# @raise [Error::RPCError] RPC error from call.
396+
def execute_update_with_start_workflow(
397+
update,
398+
*args,
399+
start_workflow_operation:,
400+
id: SecureRandom.uuid,
401+
rpc_options: nil
402+
)
403+
start_update_with_start_workflow(
404+
update,
405+
*args,
406+
start_workflow_operation:,
407+
wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
408+
id:,
409+
rpc_options:
410+
).result
411+
end
412+
413+
# Send a signal, possibly starting the workflow at the same time if it doesn't exist.
414+
#
415+
# @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name.
416+
# @param args [Array<Object>] Signal arguments.
417+
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not
418+
# support all `id_conflict_policy` options.
419+
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
420+
#
421+
# @return [WorkflowHandle] A workflow handle to the workflow.
422+
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
423+
# @raise [Error::RPCError] RPC error from call.
424+
def signal_with_start_workflow(
425+
signal,
426+
*args,
427+
start_workflow_operation:,
428+
rpc_options: nil
429+
)
430+
@impl.signal_with_start_workflow(
431+
Interceptor::SignalWithStartWorkflowInput.new(
432+
signal:,
433+
args:,
434+
start_workflow_operation:,
435+
rpc_options:
436+
)
437+
)
438+
end
439+
337440
# List workflows.
338441
#
339442
# @param query [String, nil] A Temporal visibility list filter.

temporalio/lib/temporalio/client/interceptor.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,26 @@ def intercept_client(next_interceptor)
3838
:rpc_options
3939
)
4040

41+
# Input for {Outbound.start_update_with_start_workflow}.
42+
StartUpdateWithStartWorkflowInput = Data.define(
43+
:update_id,
44+
:update,
45+
:args,
46+
:wait_for_stage,
47+
:start_workflow_operation,
48+
:headers,
49+
:rpc_options
50+
)
51+
52+
# Input for {Outbound.signal_with_start_workflow}.
53+
SignalWithStartWorkflowInput = Data.define(
54+
:signal,
55+
:args,
56+
:start_workflow_operation,
57+
# Headers intentionally not defined here, because they are not separate from start_workflow_operation
58+
:rpc_options
59+
)
60+
4161
# Input for {Outbound.list_workflows}.
4262
ListWorkflowsInput = Data.define(
4363
:query,
@@ -240,6 +260,23 @@ def start_workflow(input)
240260
next_interceptor.start_workflow(input)
241261
end
242262

263+
# Called for every {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow}
264+
# call.
265+
#
266+
# @param input [StartUpdateWithStartWorkflowInput] Input.
267+
# @return [WorkflowUpdateHandle] Workflow update handle.
268+
def start_update_with_start_workflow(input)
269+
next_interceptor.start_update_with_start_workflow(input)
270+
end
271+
272+
# Called for every {Client.signal_with_start_workflow}.
273+
#
274+
# @param input [SignalWithStartWorkflowInput] Input.
275+
# @return [WorkflowHandle] Workflow handle.
276+
def signal_with_start_workflow(input)
277+
next_interceptor.signal_with_start_workflow(input)
278+
end
279+
243280
# Called for every {Client.list_workflows} call.
244281
#
245282
# @param input [ListWorkflowsInput] Input.
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/common_enums'
4+
5+
module Temporalio
6+
class Client
7+
# Start operation used by {Client.start_update_with_start_workflow}, {Client.execute_update_with_start_workflow},
8+
# and {Client.signal_with_start_workflow}.
9+
class WithStartWorkflowOperation
10+
Options = Data.define(
11+
:workflow,
12+
:args,
13+
:id,
14+
:task_queue,
15+
:execution_timeout,
16+
:run_timeout,
17+
:task_timeout,
18+
:id_reuse_policy,
19+
:id_conflict_policy,
20+
:retry_policy,
21+
:cron_schedule,
22+
:memo,
23+
:search_attributes,
24+
:start_delay,
25+
:headers
26+
)
27+
28+
# Options the operation was created with.
29+
class Options; end # rubocop:disable Lint/EmptyClass
30+
31+
# @return [Options] Options the operation was created with.
32+
attr_accessor :options
33+
34+
# Create a with-start workflow operation. These are mostly the same options as {Client.start_workflow}, see that
35+
# documentation for more details.
36+
#
37+
# Note, for {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow},
38+
# `id_conflict_policy` is required.
39+
def initialize(
40+
workflow,
41+
*args,
42+
id:,
43+
task_queue:,
44+
execution_timeout: nil,
45+
run_timeout: nil,
46+
task_timeout: nil,
47+
id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE,
48+
id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED,
49+
retry_policy: nil,
50+
cron_schedule: nil,
51+
memo: nil,
52+
search_attributes: nil,
53+
start_delay: nil,
54+
headers: {}
55+
)
56+
@options = Options.new(
57+
workflow:,
58+
args:,
59+
id:,
60+
task_queue:,
61+
execution_timeout:,
62+
run_timeout:,
63+
task_timeout:,
64+
id_reuse_policy:,
65+
id_conflict_policy:,
66+
retry_policy:,
67+
cron_schedule:,
68+
memo:,
69+
search_attributes:,
70+
start_delay:,
71+
headers:
72+
)
73+
@workflow_handle_mutex = Mutex.new
74+
@workflow_handle_cond_var = ConditionVariable.new
75+
end
76+
77+
# Get the workflow handle, possibly waiting until set, or raise an error if the workflow start was unsuccessful.
78+
#
79+
# @param wait [Boolean] True to wait until it is set, false to return immediately.
80+
#
81+
# @return [WorkflowHandle, nil] The workflow handle when available or `nil` if `wait` is false and it is not set
82+
# yet.
83+
# @raise [Error] Any error that occurred during the call before the workflow start returned.
84+
def workflow_handle(wait: true)
85+
@workflow_handle_mutex.synchronize do
86+
@workflow_handle_cond_var.wait(@workflow_handle_mutex) unless @workflow_handle || !wait
87+
raise @workflow_handle if @workflow_handle.is_a?(Exception)
88+
89+
@workflow_handle
90+
end
91+
end
92+
93+
# @!visibility private
94+
def _set_workflow_handle(value)
95+
@workflow_handle_mutex.synchronize do
96+
@workflow_handle ||= value
97+
@workflow_handle_cond_var.broadcast
98+
end
99+
end
100+
101+
# @!visibility private
102+
def _mark_used
103+
raise 'Start operation already used' if @in_use
104+
105+
@in_use = true
106+
end
107+
end
108+
end
109+
end

temporalio/lib/temporalio/error.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def grpc_status
124124

125125
def create_grpc_status
126126
return Api::Common::V1::GrpcStatus.new(code: @code) unless @raw_grpc_status
127+
return @raw_grpc_status if @raw_grpc_status.is_a?(Api::Common::V1::GrpcStatus)
127128

128129
Api::Common::V1::GrpcStatus.decode(@raw_grpc_status)
129130
end

0 commit comments

Comments
 (0)