Skip to content

Commit 2115ff7

Browse files
authored
Workflow and activity instance access from context (#207)
Fixes #180
1 parent c79cc61 commit 2115ff7

File tree

15 files changed

+141
-18
lines changed

15 files changed

+141
-18
lines changed

temporalio/lib/temporalio/activity/context.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ def info
4848
raise NotImplementedError
4949
end
5050

51+
# @return [Object, nil] Activity class instance. This should always be present except for advanced cases where the
52+
# definition was manually created without any instance getter/creator.
53+
def instance
54+
raise NotImplementedError
55+
end
56+
5157
# Record a heartbeat on the activity.
5258
#
5359
# Heartbeats should be used for all non-immediately-returning, non-local activities and they are required to

temporalio/lib/temporalio/activity/definition.rb

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ class Info
105105
# @return [String, Symbol, nil] Name of the activity, or nil if the activity is dynamic.
106106
attr_reader :name
107107

108-
# @return [Proc] Proc for the activity.
108+
# @return [Object, Proc, nil] The pre-created instance or the proc to create/return it.
109+
attr_reader :instance
110+
111+
# @return [Proc] Proc for the activity. Should use {Context#instance} to access the instance.
109112
attr_reader :proc
110113

111114
# @return [Symbol] Name of the executor. Default is `:default`.
@@ -134,18 +137,20 @@ def self.from_activity(activity)
134137
details = activity._activity_definition_details
135138
new(
136139
name: details[:activity_name],
140+
instance: proc { activity.new },
137141
executor: details[:activity_executor],
138142
cancel_raise: details[:activity_cancel_raise],
139143
raw_args: details[:activity_raw_args]
140-
) { |*args| activity.new.execute(*args) } # Instantiate and call
144+
) { |*args| Context.current.instance&.execute(*args) }
141145
when Definition
142146
details = activity.class._activity_definition_details
143147
new(
144148
name: details[:activity_name],
149+
instance: activity,
145150
executor: details[:activity_executor],
146151
cancel_raise: details[:activity_cancel_raise],
147152
raw_args: details[:activity_raw_args]
148-
) { |*args| activity.execute(*args) } # Just and call
153+
) { |*args| Context.current.instance&.execute(*args) }
149154
when Info
150155
activity
151156
else
@@ -156,12 +161,21 @@ def self.from_activity(activity)
156161
# Manually create activity definition info. Most users will use an instance/class of {Definition}.
157162
#
158163
# @param name [String, Symbol, nil] Name of the activity or nil for dynamic activity.
164+
# @param instance [Object, Proc, nil] The pre-created instance or the proc to create/return it.
159165
# @param executor [Symbol] Name of the executor.
160166
# @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation.
161167
# @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments.
162168
# @yield Use this block as the activity.
163-
def initialize(name:, executor: :default, cancel_raise: true, raw_args: false, &block)
169+
def initialize(
170+
name:,
171+
instance: nil,
172+
executor: :default,
173+
cancel_raise: true,
174+
raw_args: false,
175+
&block
176+
)
164177
@name = name
178+
@instance = instance
165179
raise ArgumentError, 'Must give block' unless block_given?
166180

167181
@proc = block

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def execute_activity(task_token, defn, start)
210210
)
211211
Activity::Context._current_executor&.set_activity_context(defn, activity)
212212
set_running_activity(task_token, activity)
213-
run_activity(activity, input)
213+
run_activity(defn, activity, input)
214214
rescue Exception => e # rubocop:disable Lint/RescueException We are intending to catch everything here
215215
@scoped_logger.warn("Failed starting or sending completion for activity #{start.activity_type}")
216216
@scoped_logger.warn(e)
@@ -236,8 +236,11 @@ def execute_activity(task_token, defn, start)
236236
remove_running_activity(task_token)
237237
end
238238

239-
def run_activity(activity, input)
239+
def run_activity(defn, activity, input)
240240
result = begin
241+
# Create the instance. We choose to do this before interceptors so that it is available in the interceptor.
242+
activity.instance = defn.instance.is_a?(Proc) ? defn.instance.call : defn.instance # steep:ignore
243+
241244
# Build impl with interceptors
242245
# @type var impl: Temporalio::Worker::Interceptor::Activity::Inbound
243246
impl = InboundImplementation.new(self)
@@ -293,7 +296,7 @@ def run_activity(activity, input)
293296

294297
class RunningActivity < Activity::Context
295298
attr_reader :info, :cancellation, :worker_shutdown_cancellation, :payload_converter, :logger
296-
attr_accessor :_outbound_impl, :_server_requested_cancel
299+
attr_accessor :instance, :_outbound_impl, :_server_requested_cancel
297300

298301
def initialize( # rubocop:disable Lint/MissingSuper
299302
info:,

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ def info
122122
@instance.info
123123
end
124124

125+
def instance
126+
@instance.instance
127+
end
128+
125129
def initialize_continue_as_new_error(error)
126130
@outbound.initialize_continue_as_new_error(
127131
Temporalio::Worker::Interceptor::Workflow::InitializeContinueAsNewErrorInput.new(error:)

temporalio/lib/temporalio/testing/activity_environment.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ def run(activity, *args)
8080
Activity::Context._current_executor = executor
8181
executor.set_activity_context(defn, Context.new(
8282
info: @info.dup,
83+
instance:
84+
defn.instance.is_a?(Proc) ? defn.instance.call : defn.instance,
8385
on_heartbeat: @on_heartbeat,
8486
cancellation: @cancellation,
8587
worker_shutdown_cancellation: @worker_shutdown_cancellation,
@@ -102,17 +104,19 @@ def run(activity, *args)
102104

103105
# @!visibility private
104106
class Context < Activity::Context
105-
attr_reader :info, :cancellation, :worker_shutdown_cancellation, :payload_converter, :logger
107+
attr_reader :info, :instance, :cancellation, :worker_shutdown_cancellation, :payload_converter, :logger
106108

107109
def initialize( # rubocop:disable Lint/MissingSuper
108-
info: ActivityEnvironment.default_info,
109-
on_heartbeat: nil,
110-
cancellation: Cancellation.new,
111-
worker_shutdown_cancellation: Cancellation.new,
112-
payload_converter: Converters::PayloadConverter.default,
113-
logger: Logger.new(nil)
110+
info:,
111+
instance:,
112+
on_heartbeat:,
113+
cancellation:,
114+
worker_shutdown_cancellation:,
115+
payload_converter:,
116+
logger:
114117
)
115118
@info = info
119+
@instance = instance
116120
@on_heartbeat = on_heartbeat
117121
@cancellation = cancellation
118122
@worker_shutdown_cancellation = worker_shutdown_cancellation

temporalio/lib/temporalio/workflow.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ def self.info
220220
_current.info
221221
end
222222

223+
# @return [Definition, nil] Workflow class instance. This should always be present except in
224+
# {Worker::Interceptor::Workflow::Inbound.init} where it will be nil.
225+
def self.instance
226+
_current.instance
227+
end
228+
223229
# @return [Logger] Logger for the workflow. This is a scoped logger that automatically appends workflow details to
224230
# every log and takes care not to log during replay.
225231
def self.logger

temporalio/sig/temporalio/activity/context.rbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ module Temporalio
99
def self._current_executor=: (Worker::ActivityExecutor? executor) -> void
1010

1111
def info: -> Info
12+
def instance: -> Definition?
1213
def heartbeat: (*Object? details) -> void
1314
def cancellation: -> Cancellation
1415
def worker_shutdown_cancellation: -> Cancellation

temporalio/sig/temporalio/activity/definition.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module Temporalio
1818

1919
class Info
2020
attr_reader name: String | Symbol | nil
21+
attr_reader instance: Object | Proc | nil
2122
attr_reader proc: Proc
2223
attr_reader executor: Symbol
2324
attr_reader cancel_raise: bool
@@ -27,6 +28,7 @@ module Temporalio
2728

2829
def initialize: (
2930
name: String | Symbol | nil,
31+
?instance: Object | Proc | nil,
3032
?executor: Symbol,
3133
?cancel_raise: bool,
3234
?raw_args: bool

temporalio/sig/temporalio/internal/worker/activity_worker.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ module Temporalio
2121

2222
def execute_activity: (String task_token, Activity::Definition::Info defn, untyped start) -> void
2323
def run_activity: (
24+
Activity::Definition::Info defn,
2425
RunningActivity activity,
2526
Temporalio::Worker::Interceptor::Activity::ExecuteInput input
2627
) -> void
2728

2829
class RunningActivity < Activity::Context
30+
attr_accessor instance: Activity::Definition?
2931
attr_accessor _outbound_impl: Temporalio::Worker::Interceptor::Activity::Outbound?
3032
attr_accessor _server_requested_cancel: bool
3133

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ module Temporalio
4040

4141
def add_command: (untyped command) -> void
4242

43-
def instance: -> Object
43+
def instance: -> Temporalio::Workflow::Definition
4444

4545
def search_attributes: -> SearchAttributes
4646

@@ -58,7 +58,7 @@ module Temporalio
5858

5959
def activate_internal: (untyped activation) -> untyped
6060

61-
def create_instance: -> Object
61+
def create_instance: -> Temporalio::Workflow::Definition
6262

6363
def apply: (untyped job) -> void
6464

0 commit comments

Comments
 (0)