Skip to content

Commit e7f19b9

Browse files
authored
Reserve certain prefixes (#221)
Fixes #195
1 parent 66d68ea commit e7f19b9

File tree

13 files changed

+291
-43
lines changed

13 files changed

+291
-43
lines changed

temporalio/lib/temporalio/activity/definition.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# frozen_string_literal: true
22

3+
require 'temporalio/internal/proto_utils'
4+
35
module Temporalio
46
module Activity
57
# Base class for all activities.
@@ -182,6 +184,7 @@ def initialize(
182184
@executor = executor
183185
@cancel_raise = cancel_raise
184186
@raw_args = raw_args
187+
Internal::ProtoUtils.assert_non_reserved_name(name)
185188
end
186189
end
187190
end

temporalio/lib/temporalio/internal/proto_utils.rb

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def self.memo_to_proto_hash(hash, converter)
5151
end
5252

5353
def self.memo_from_proto(memo, converter)
54-
return nil if memo.nil? || memo.fields.size.zero? # rubocop:disable Style/ZeroLengthPredicate Google Maps don't have empty
54+
return nil if memo.nil? || memo.fields.size.zero? # rubocop:disable Style/ZeroLengthPredicate -- Google Maps don't have empty
5555

5656
memo.fields.each_with_object({}) { |(key, val), h| h[key] = converter.from_payload(val) } # rubocop:disable Style/HashTransformValues
5757
end
@@ -73,7 +73,7 @@ def self.headers_from_proto(headers, converter)
7373
end
7474

7575
def self.headers_from_proto_map(headers, converter)
76-
return nil if headers.nil? || headers.size.zero? # rubocop:disable Style/ZeroLengthPredicate Google Maps don't have empty
76+
return nil if headers.nil? || headers.size.zero? # rubocop:disable Style/ZeroLengthPredicate -- Google Maps don't have empty
7777

7878
headers.each_with_object({}) do |(key, val), h| # rubocop:disable Style/HashTransformValues
7979
# @type var h: Hash[String, Object?]
@@ -106,6 +106,22 @@ def self.convert_to_payload_array(converter, values)
106106
converter.to_payloads(values).payloads.to_ary
107107
end
108108

109+
def self.assert_non_reserved_name(name)
110+
name = name&.to_s # In case it's a symbol or not present
111+
return unless name
112+
raise "'#{name}' cannot start with '__temporal_'" if name.start_with?('__temporal_')
113+
# Might as well disable __stack_trace and __enhanced_stack_trace everywhere even though technically it's only
114+
# reserved for queries
115+
raise "'#{name}' name invalid" if name == '__stack_trace' || name == '__enhanced_stack_trace'
116+
end
117+
118+
def self.reserved_name?(name)
119+
name = name&.to_s # In case it's a symbol or not present
120+
return false unless name
121+
122+
name.start_with?('__temporal_') || name == '__stack_trace' || name == '__enhanced_stack_trace'
123+
end
124+
109125
class LazyMemo
110126
def initialize(raw_memo, converter)
111127
@raw_memo = raw_memo

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ def handle_task(task)
9393
def handle_start_task(task_token, start)
9494
set_running_activity(task_token, nil)
9595

96-
# Find activity definition, falling back to dynamic if present
97-
defn = @activities[start.activity_type] || @activities[nil]
96+
# Find activity definition, falling back to dynamic if not found and not reserved name
97+
defn = @activities[start.activity_type]
98+
defn = @activities[nil] if !defn && !Internal::ProtoUtils.reserved_name?(start.activity_type)
99+
98100
if defn.nil?
99101
raise Error::ApplicationError.new(
100102
"Activity #{start.activity_type} for workflow #{start.workflow_execution.workflow_id} " \
@@ -114,7 +116,7 @@ def handle_start_task(task_token, start)
114116
# Unset at the end
115117
Activity::Context._current_executor = nil
116118
end
117-
rescue Exception => e # rubocop:disable Lint/RescueException We are intending to catch everything here
119+
rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here
118120
remove_running_activity(task_token)
119121
@scoped_logger.warn("Failed starting activity #{start.activity_type}")
120122
@scoped_logger.warn(e)
@@ -212,7 +214,7 @@ def execute_activity(task_token, defn, start)
212214
Activity::Context._current_executor&.set_activity_context(defn, activity)
213215
set_running_activity(task_token, activity)
214216
run_activity(defn, activity, input)
215-
rescue Exception => e # rubocop:disable Lint/RescueException We are intending to catch everything here
217+
rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here
216218
@scoped_logger.warn("Failed starting or sending completion for activity #{start.activity_type}")
217219
@scoped_logger.warn(e)
218220
# This means that the activity couldn't start or send completion (run
@@ -259,7 +261,7 @@ def run_activity(defn, activity, input)
259261
result: @worker.options.client.data_converter.to_payload(result)
260262
)
261263
)
262-
rescue Exception => e # rubocop:disable Lint/RescueException We are intending to catch everything here
264+
rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here
263265
if e.is_a?(Activity::CompleteAsyncError)
264266
# Wanting to complete async
265267
@scoped_logger.debug('Completing activity asynchronously')

temporalio/lib/temporalio/internal/worker/multi_runner.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def apply_thread_or_fiber_block(&)
3434
rescue InjectEventForTesting => e
3535
@queue.push(e.event)
3636
@queue.push(Event::BlockSuccess.new(result: e))
37-
rescue Exception => e # rubocop:disable Lint/RescueException Intentionally catch all
37+
rescue Exception => e # rubocop:disable Lint/RescueException -- Intentionally catch all
3838
@queue.push(Event::BlockFailure.new(error: e))
3939
end
4040
else
@@ -43,7 +43,7 @@ def apply_thread_or_fiber_block(&)
4343
rescue InjectEventForTesting => e
4444
@queue.push(e.event)
4545
@queue.push(Event::BlockSuccess.new(result: e))
46-
rescue Exception => e # rubocop:disable Lint/RescueException Intentionally catch all
46+
rescue Exception => e # rubocop:disable Lint/RescueException -- Intentionally catch all
4747
@queue.push(Event::BlockFailure.new(error: e))
4848
end
4949
end

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,10 @@ def apply(job)
348348
end
349349

350350
def apply_signal(job)
351-
defn = signal_handlers[job.signal_name] || signal_handlers[nil]
351+
# Get signal definition, falling back to dynamic if not present and not reserved
352+
defn = signal_handlers[job.signal_name]
353+
defn = signal_handlers[nil] if !defn && !Internal::ProtoUtils.reserved_name?(job.signal_name)
354+
352355
handler_exec =
353356
if defn
354357
HandlerExecution.new(name: job.signal_name, update_id: nil, unfinished_policy: defn.unfinished_policy)
@@ -381,37 +384,38 @@ def apply_signal(job)
381384
end
382385

383386
def apply_query(job)
384-
# TODO(cretz): __temporal_workflow_metadata
385-
defn = case job.query_type
386-
when '__stack_trace'
387-
Workflow::Definition::Query.new(
388-
name: '__stack_trace',
389-
to_invoke: proc { scheduler.stack_trace }
390-
)
391-
else
392-
query_handlers[job.query_type] || query_handlers[nil]
393-
end
394387
schedule do
395-
unless defn
396-
raise "Query handler for #{job.query_type} expected but not found, " \
397-
"known queries: [#{query_handlers.keys.compact.sort.join(', ')}]"
398-
end
388+
# If it's a built-in, run it without interceptors, otherwise do normal behavior
389+
# TODO(cretz): __temporal_workflow_metadata
390+
result = if job.query_type == '__stack_trace'
391+
scheduler.stack_trace
392+
else
393+
# Get query definition, falling back to dynamic if not present and not reserved
394+
defn = query_handlers[job.query_type]
395+
defn = query_handlers[nil] if !defn && !Internal::ProtoUtils.reserved_name?(job.query_type)
396+
397+
unless defn
398+
raise "Query handler for #{job.query_type} expected but not found, " \
399+
"known queries: [#{query_handlers.keys.compact.sort.join(', ')}]"
400+
end
401+
402+
with_context_frozen do
403+
@inbound.handle_query(
404+
Temporalio::Worker::Interceptor::Workflow::HandleQueryInput.new(
405+
id: job.query_id,
406+
query: job.query_type,
407+
args: begin
408+
convert_handler_args(payload_array: job.arguments, defn:)
409+
rescue StandardError => e
410+
raise "Failed converting query input arguments: #{e}"
411+
end,
412+
definition: defn,
413+
headers: ProtoUtils.headers_from_proto_map(job.headers, @payload_converter) || {}
414+
)
415+
)
416+
end
417+
end
399418

400-
result = with_context_frozen do
401-
@inbound.handle_query(
402-
Temporalio::Worker::Interceptor::Workflow::HandleQueryInput.new(
403-
id: job.query_id,
404-
query: job.query_type,
405-
args: begin
406-
convert_handler_args(payload_array: job.arguments, defn:)
407-
rescue StandardError => e
408-
raise "Failed converting query input arguments: #{e}"
409-
end,
410-
definition: defn,
411-
headers: ProtoUtils.headers_from_proto_map(job.headers, @payload_converter) || {}
412-
)
413-
)
414-
end
415419
add_command(
416420
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
417421
respond_to_query: Bridge::Api::WorkflowCommands::QueryResult.new(
@@ -435,7 +439,10 @@ def apply_query(job)
435439
end
436440

437441
def apply_update(job)
438-
defn = update_handlers[job.name] || update_handlers[nil]
442+
# Get update definition, falling back to dynamic if not present and not reserved
443+
defn = update_handlers[job.name]
444+
defn = update_handlers[nil] if !defn && !Internal::ProtoUtils.reserved_name?(job.name)
445+
439446
handler_exec =
440447
(HandlerExecution.new(name: job.name, update_id: job.id, unfinished_policy: defn.unfinished_policy) if defn)
441448
schedule(handler_exec:) do

temporalio/lib/temporalio/testing/activity_environment.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def run(activity, *args)
9898
client: @client
9999
))
100100
queue.push([defn.proc.call(*args), nil])
101-
rescue Exception => e # rubocop:disable Lint/RescueException Intentionally capturing all exceptions
101+
rescue Exception => e # rubocop:disable Lint/RescueException -- Intentionally capturing all exceptions
102102
queue.push([nil, e])
103103
ensure
104104
executor.set_activity_context(defn, nil)

temporalio/lib/temporalio/worker.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require 'temporalio/error'
77
require 'temporalio/internal/bridge'
88
require 'temporalio/internal/bridge/worker'
9+
require 'temporalio/internal/proto_utils'
910
require 'temporalio/internal/worker/activity_worker'
1011
require 'temporalio/internal/worker/multi_runner'
1112
require 'temporalio/internal/worker/workflow_instance'
@@ -381,6 +382,8 @@ def initialize(
381382
)
382383
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
383384

385+
Internal::ProtoUtils.assert_non_reserved_name(task_queue)
386+
384387
@options = Options.new(
385388
client:,
386389
task_queue:,

temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,12 @@ def create_instance(initial_activation, worker_state)
186186
raise 'Missing initialize job in initial activation' unless init_job
187187

188188
# Obtain definition
189-
definition = worker_state.workflow_definitions[init_job.workflow_type] ||
190-
worker_state.workflow_definitions[nil]
189+
definition = worker_state.workflow_definitions[init_job.workflow_type]
190+
# If not present and not reserved, try dynamic
191+
if !definition && !Internal::ProtoUtils.reserved_name?(init_job.workflow_type)
192+
definition = worker_state.workflow_definitions[nil]
193+
end
194+
191195
unless definition
192196
raise Error::ApplicationError.new(
193197
"Workflow type #{init_job.workflow_type} is not registered on this worker, available workflows: " +

temporalio/lib/temporalio/workflow/definition.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# frozen_string_literal: true
22

3+
require 'temporalio/internal/proto_utils'
34
require 'temporalio/workflow'
45
require 'temporalio/workflow/handler_unfinished_policy'
56

@@ -430,6 +431,7 @@ def initialize(
430431
@signals = signals.dup.freeze
431432
@queries = queries.dup.freeze
432433
@updates = updates.dup.freeze
434+
Internal::ProtoUtils.assert_non_reserved_name(name)
433435
end
434436

435437
# @return [String] Workflow name.
@@ -473,6 +475,7 @@ def initialize(
473475
@to_invoke = to_invoke
474476
@raw_args = raw_args
475477
@unfinished_policy = unfinished_policy
478+
Internal::ProtoUtils.assert_non_reserved_name(name)
476479
end
477480
end
478481

@@ -507,6 +510,7 @@ def initialize(
507510
@name = name
508511
@to_invoke = to_invoke
509512
@raw_args = raw_args
513+
Internal::ProtoUtils.assert_non_reserved_name(name)
510514
end
511515
end
512516

@@ -548,6 +552,7 @@ def initialize(
548552
@raw_args = raw_args
549553
@unfinished_policy = unfinished_policy
550554
@validator_to_invoke = validator_to_invoke
555+
Internal::ProtoUtils.assert_non_reserved_name(name)
551556
end
552557

553558
# @!visibility private

temporalio/sig/temporalio/internal/proto_utils.rbs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ module Temporalio
5959
Array[Object?] values
6060
) -> Array[untyped]
6161

62+
def self.assert_non_reserved_name: (String | Symbol | nil name) -> void
63+
def self.reserved_name?: (String | Symbol | nil name) -> bool
64+
6265
class LazyMemo
6366
def initialize: (
6467
untyped? raw_memo,

0 commit comments

Comments
 (0)