Skip to content

Commit 90d90bd

Browse files
authored
Support unsafe IO/io_wait inside workflows (#243)
Fixes #239
1 parent 5af1ad4 commit 90d90bd

File tree

19 files changed

+121
-12
lines changed

19 files changed

+121
-12
lines changed

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
5757
:failure_converter, :cancellation, :continue_as_new_suggested, :current_history_length,
5858
:current_history_size, :replaying, :random, :signal_handlers, :query_handlers, :update_handlers,
5959
:context_frozen
60-
attr_accessor :current_details
60+
attr_accessor :io_enabled, :current_details
6161

6262
def initialize(details)
6363
# Initialize general state
@@ -68,6 +68,7 @@ def initialize(details)
6868
@logger = ReplaySafeLogger.new(logger: details.logger, instance: self)
6969
@logger.scoped_values_getter = proc { scoped_logger_info }
7070
@runtime_metric_meter = details.metric_meter
71+
@io_enabled = details.unsafe_workflow_io_enabled
7172
@scheduler = Scheduler.new(self)
7273
@payload_converter = details.payload_converter
7374
@failure_converter = details.failure_converter

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,16 @@ def initialize_continue_as_new_error(error)
164164
)
165165
end
166166

167+
def io_enabled(&)
168+
prev = @instance.io_enabled
169+
@instance.io_enabled = true
170+
begin
171+
yield
172+
ensure
173+
@instance.io_enabled = prev
174+
end
175+
end
176+
167177
def logger
168178
@instance.logger
169179
end

temporalio/lib/temporalio/internal/worker/workflow_instance/details.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class WorkflowInstance
88
class Details
99
attr_reader :namespace, :task_queue, :definition, :initial_activation, :logger, :metric_meter,
1010
:payload_converter, :failure_converter, :interceptors, :disable_eager_activity_execution,
11-
:illegal_calls, :workflow_failure_exception_types
11+
:illegal_calls, :workflow_failure_exception_types, :unsafe_workflow_io_enabled
1212

1313
def initialize(
1414
namespace:,
@@ -22,7 +22,8 @@ def initialize(
2222
interceptors:,
2323
disable_eager_activity_execution:,
2424
illegal_calls:,
25-
workflow_failure_exception_types:
25+
workflow_failure_exception_types:,
26+
unsafe_workflow_io_enabled:
2627
)
2728
@namespace = namespace
2829
@task_queue = task_queue
@@ -36,6 +37,7 @@ def initialize(
3637
@disable_eager_activity_execution = disable_eager_activity_execution
3738
@illegal_calls = illegal_calls
3839
@workflow_failure_exception_types = workflow_failure_exception_types
40+
@unsafe_workflow_io_enabled = unsafe_workflow_io_enabled
3941
end
4042
end
4143
end

temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,28 @@ def fiber(&block)
137137
end
138138

139139
def io_wait(io, events, timeout)
140-
# TODO(cretz): This in a blocking fashion?
141-
raise NotImplementedError, 'TODO'
140+
# Do not allow if IO disabled
141+
unless @instance.io_enabled
142+
raise Workflow::NondeterminismError,
143+
'Cannot perform IO from inside a workflow. If this is known to be safe, ' \
144+
'the code can be run in a Temporalio::Workflow::Unsafe.io_enabled block.'
145+
end
146+
147+
# Use regular Ruby behavior of blocking this thread. There is no Ruby implementation of io_wait we can just
148+
# delegate to at this time (or default scheduler or anything like that), so we had to implement this
149+
# ourselves.
150+
readers = events.nobits?(IO::READABLE) ? nil : [io]
151+
writers = events.nobits?(IO::WRITABLE) ? nil : [io]
152+
priority = events.nobits?(IO::PRIORITY) ? nil : [io]
153+
ready = IO.select(readers, writers, priority, timeout) # steep:ignore
154+
155+
result = 0
156+
unless ready.nil?
157+
result |= IO::READABLE if ready[0]&.include?(io)
158+
result |= IO::WRITABLE if ready[1]&.include?(io)
159+
result |= IO::PRIORITY if ready[2]&.include?(io)
160+
end
161+
result
142162
end
143163

144164
def kernel_sleep(duration = nil)

temporalio/lib/temporalio/internal/worker/workflow_worker.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def initialize(
6767
illegal_workflow_calls:,
6868
workflow_failure_exception_types:,
6969
workflow_payload_codec_thread_pool:,
70+
unsafe_workflow_io_enabled:,
7071
debug_mode:,
7172
on_eviction: nil
7273
)
@@ -109,7 +110,8 @@ def initialize(
109110
end
110111

111112
t
112-
end.freeze
113+
end.freeze,
114+
unsafe_workflow_io_enabled:
113115
)
114116
@state.on_eviction = on_eviction if on_eviction
115117

@@ -184,14 +186,14 @@ def apply_codec_on_payload_visit(payload_or_payloads, &)
184186
class State
185187
attr_reader :workflow_definitions, :bridge_worker, :logger, :metric_meter, :data_converter, :deadlock_timeout,
186188
:illegal_calls, :namespace, :task_queue, :disable_eager_activity_execution,
187-
:workflow_interceptors, :workflow_failure_exception_types
189+
:workflow_interceptors, :workflow_failure_exception_types, :unsafe_workflow_io_enabled
188190

189191
attr_writer :on_eviction
190192

191193
def initialize(
192194
workflow_definitions:, bridge_worker:, logger:, metric_meter:, data_converter:, deadlock_timeout:,
193195
illegal_calls:, namespace:, task_queue:, disable_eager_activity_execution:,
194-
workflow_interceptors:, workflow_failure_exception_types:
196+
workflow_interceptors:, workflow_failure_exception_types:, unsafe_workflow_io_enabled:
195197
)
196198
@workflow_definitions = workflow_definitions
197199
@bridge_worker = bridge_worker
@@ -205,6 +207,7 @@ def initialize(
205207
@disable_eager_activity_execution = disable_eager_activity_execution
206208
@workflow_interceptors = workflow_interceptors
207209
@workflow_failure_exception_types = workflow_failure_exception_types
210+
@unsafe_workflow_io_enabled = unsafe_workflow_io_enabled
208211

209212
@running_workflows = {}
210213
@running_workflows_mutex = Mutex.new

temporalio/lib/temporalio/worker.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class Worker
5252
:illegal_workflow_calls,
5353
:workflow_failure_exception_types,
5454
:workflow_payload_codec_thread_pool,
55+
:unsafe_workflow_io_enabled,
5556
:debug_mode
5657
)
5758

@@ -347,6 +348,9 @@ def self.default_illegal_workflow_calls
347348
# @param workflow_payload_codec_thread_pool [ThreadPool, nil] Thread pool to run payload codec encode/decode within.
348349
# This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially block
349350
# execution which is why they need to be run in the background.
351+
# @param unsafe_workflow_io_enabled [Boolean] If false, the default, workflow code that invokes io_wait on the fiber
352+
# scheduler will fail. Instead of setting this to true, users are encouraged to use {Workflow::Unsafe.io_enabled}
353+
# with a block for narrower enabling of IO.
350354
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
351355
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
352356
# `true` or `1`.
@@ -378,6 +382,7 @@ def initialize(
378382
illegal_workflow_calls: Worker.default_illegal_workflow_calls,
379383
workflow_failure_exception_types: [],
380384
workflow_payload_codec_thread_pool: nil,
385+
unsafe_workflow_io_enabled: false,
381386
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
382387
)
383388
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
@@ -412,6 +417,7 @@ def initialize(
412417
illegal_workflow_calls:,
413418
workflow_failure_exception_types:,
414419
workflow_payload_codec_thread_pool:,
420+
unsafe_workflow_io_enabled:,
415421
debug_mode:
416422
).freeze
417423

@@ -483,6 +489,7 @@ def initialize(
483489
illegal_workflow_calls:,
484490
workflow_failure_exception_types:,
485491
workflow_payload_codec_thread_pool:,
492+
unsafe_workflow_io_enabled:,
486493
debug_mode:
487494
)
488495
end

temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ def create_instance(initial_activation, worker_state)
213213
interceptors: worker_state.workflow_interceptors,
214214
disable_eager_activity_execution: worker_state.disable_eager_activity_execution,
215215
illegal_calls: worker_state.illegal_calls,
216-
workflow_failure_exception_types: worker_state.workflow_failure_exception_types
216+
workflow_failure_exception_types: worker_state.workflow_failure_exception_types,
217+
unsafe_workflow_io_enabled: worker_state.unsafe_workflow_io_enabled
217218
)
218219
)
219220
end

temporalio/lib/temporalio/worker/workflow_replayer.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class WorkflowReplayer
3030
:illegal_workflow_calls,
3131
:workflow_failure_exception_types,
3232
:workflow_payload_codec_thread_pool,
33+
:unsafe_workflow_io_enabled,
3334
:debug_mode,
3435
:runtime
3536
)
@@ -69,6 +70,9 @@ class Options; end # rubocop:disable Lint/EmptyClass
6970
# @param workflow_payload_codec_thread_pool [ThreadPool, nil] Thread pool to run payload codec encode/decode
7071
# within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially
7172
# block execution which is why they need to be run in the background.
73+
# @param unsafe_workflow_io_enabled [Boolean] If false, the default, workflow code that invokes io_wait on the
74+
# fiber scheduler will fail. Instead of setting this to true, users are encouraged to use
75+
# {Workflow::Unsafe.io_enabled} with a block for narrower enabling of IO.
7276
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
7377
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
7478
# `true` or `1`.
@@ -89,6 +93,7 @@ def initialize(
8993
illegal_workflow_calls: Worker.default_illegal_workflow_calls,
9094
workflow_failure_exception_types: [],
9195
workflow_payload_codec_thread_pool: nil,
96+
unsafe_workflow_io_enabled: false,
9297
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase),
9398
runtime: Runtime.default,
9499
&
@@ -106,6 +111,7 @@ def initialize(
106111
illegal_workflow_calls:,
107112
workflow_failure_exception_types:,
108113
workflow_payload_codec_thread_pool:,
114+
unsafe_workflow_io_enabled:,
109115
debug_mode:,
110116
runtime:
111117
).freeze
@@ -237,6 +243,7 @@ def initialize(
237243
illegal_workflow_calls: options.illegal_workflow_calls,
238244
workflow_failure_exception_types: options.workflow_failure_exception_types,
239245
workflow_payload_codec_thread_pool: options.workflow_payload_codec_thread_pool,
246+
unsafe_workflow_io_enabled: options.unsafe_workflow_io_enabled,
240247
debug_mode: options.debug_mode,
241248
on_eviction: proc { |_, remove_job| @last_workflow_remove_job = remove_job } # steep:ignore
242249
)

temporalio/lib/temporalio/workflow.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,13 @@ def self.replaying?
503503
def self.illegal_call_tracing_disabled(&)
504504
Workflow._current.illegal_call_tracing_disabled(&)
505505
end
506+
507+
# Run a block of code with IO enabled. Specifically this allows the `io_wait` call of the fiber scheduler to work.
508+
# Users should be cautious about using this as it can often signify unsafe code. Note, this is often only
509+
# applicable to network code as file IO and most process-based IO does not go through scheduler `io_wait`.
510+
def self.io_enabled(&)
511+
Workflow._current.io_enabled(&)
512+
end
506513
end
507514

508515
# Error that is raised by a workflow out of the primary workflow method to issue a continue-as-new.

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module Temporalio
3434
attr_reader update_handlers: Hash[String?, Workflow::Definition::Update]
3535
attr_reader context_frozen: bool
3636

37+
attr_accessor io_enabled: bool
3738
attr_accessor current_details: String?
3839

3940
def initialize: (Details details) -> void

0 commit comments

Comments
 (0)