Skip to content

Commit 5af1ad4

Browse files
authored
Support parentless OTel workflow spans (#244)
1 parent 89945c2 commit 5af1ad4

File tree

6 files changed

+162
-71
lines changed

6 files changed

+162
-71
lines changed

temporalio/lib/temporalio/contrib/open_telemetry.rb

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class TracingInterceptor
2323
# @param tracer [OpenTelemetry::Trace::Tracer] Tracer to use.
2424
# @param header_key [String] Temporal header name to serialize spans to/from. Most users should not change this.
2525
# @param propagator [Object] Propagator to use. Most users should not change this.
26+
# @param always_create_workflow_spans [Boolean] When false, the default, spans are only created in workflows
27+
# when an overarching span from the client is present. In cases of starting a workflow elsewhere, e.g. CLI or
28+
# schedules, a client-created span is not present and workflow spans will not be created. Setting this to true
29+
# will create spans in workflows no matter what, but there is a risk of them being orphans since they may not
30+
# have a parent span after replaying.
2631
def initialize(
2732
tracer,
2833
header_key: '_tracer-data',
@@ -31,11 +36,13 @@ def initialize(
3136
::OpenTelemetry::Trace::Propagation::TraceContext::TextMapPropagator.new,
3237
::OpenTelemetry::Baggage::Propagation::TextMapPropagator.new
3338
]
34-
)
39+
),
40+
always_create_workflow_spans: false
3541
)
3642
@tracer = tracer
3743
@header_key = header_key
3844
@propagator = propagator
45+
@always_create_workflow_spans = always_create_workflow_spans
3946
end
4047

4148
# @!visibility private
@@ -85,6 +92,11 @@ def _with_started_span(
8592
end
8693
end
8794

95+
# @!visibility private
96+
def _always_create_workflow_spans
97+
@always_create_workflow_spans
98+
end
99+
88100
# @!visibility private
89101
class ClientOutbound < Client::Interceptor::Outbound
90102
def initialize(root, next_interceptor)
@@ -423,30 +435,34 @@ def self.completed_span(
423435
even_during_replay: false
424436
)
425437
# Get root interceptor, which also checks if in workflow
426-
root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor]
438+
root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor] #: TracingInterceptor?
427439
raise 'Tracing interceptor not configured' unless root
428440

429441
# Do nothing if replaying and not wanted during replay
430442
return nil if !even_during_replay && Temporalio::Workflow::Unsafe.replaying?
431443

432-
# Do nothing if there is no span on the context. We do not want orphan spans coming from workflows, so we
433-
# require a parent (i.e. a current).
434-
# TODO(cretz): This matches Python behavior but not .NET behavior (which will create no matter what), is that
435-
# ok?
436-
return nil if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID
444+
# If there is no span on the context and the user hasn't opted in to always creating, do not create. This
445+
# prevents orphans if there was no span originally created from the client start-workflow call.
446+
if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID &&
447+
!root._always_create_workflow_spans
448+
return nil
449+
end
437450

438451
# Create attributes, adding user-defined ones
439452
attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id,
440453
'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes)
441454

442-
# Create span
443-
time = Temporalio::Workflow.now
444-
timestamp = (time.to_i * 1_000_000_000) + time.nsec
445-
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore
446-
# Record exception if present
447-
span.record_exception(exception) if exception
448-
# Finish the span (returns self)
449-
span.finish(end_timestamp: timestamp)
455+
# Create span, which has to be done with illegal call disabling because OTel asks for full exception message
456+
# which uses error highlighting and such which accesses File#path
457+
Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do
458+
time = Temporalio::Workflow.now
459+
timestamp = (time.to_i * 1_000_000_000) + time.nsec
460+
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore
461+
# Record exception if present
462+
span.record_exception(exception) if exception
463+
# Finish the span (returns self)
464+
span.finish(end_timestamp: timestamp)
465+
end
450466
end
451467
end
452468
end

temporalio/sig/temporalio/contrib/open_telemetry.rbs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ module Temporalio
1111
def initialize: (
1212
untyped tracer,
1313
?header_key: String,
14-
?propagator: untyped
14+
?propagator: untyped,
15+
?always_create_workflow_spans: bool
1516
) -> void
1617

1718
def _apply_context_to_headers: (Hash[String, untyped] headers, ?context: untyped) -> void
@@ -23,6 +24,7 @@ module Temporalio
2324
?attributes: Hash[untyped, untyped]?,
2425
?outbound_input: untyped
2526
) { () -> T } -> T
27+
def _always_create_workflow_spans: -> bool
2628

2729
class WorkflowInbound < Worker::Interceptor::Workflow::Inbound
2830
def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void

temporalio/test/contrib/open_telemetry_test.rb

Lines changed: 112 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,12 @@ def update(scenario)
9696
) do
9797
Temporalio::Workflow.execute_local_activity(TestActivity, :fail_first_attempt, start_to_close_timeout: 30)
9898
end
99-
when :child_workflow
100-
# Start a child, send child signal and external signal, finish
99+
when :child_workflow_child_signal
100+
handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal)
101+
handle.signal(TestWorkflow.signal, :mark_finished)
102+
[handle.id, handle.first_execution_run_id, handle.result]
103+
when :child_workflow_external_signal
101104
handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal)
102-
handle.signal(TestWorkflow.signal, :complete)
103105
Temporalio::Workflow.external_workflow_handle(handle.id).signal(TestWorkflow.signal, :mark_finished)
104106
[handle.id, handle.first_execution_run_id, handle.result]
105107
else
@@ -138,11 +140,18 @@ def init_tracer_and_exporter
138140
[tracer, exporter]
139141
end
140142

141-
def trace(tracer_and_exporter: init_tracer_and_exporter, &)
143+
def trace(
144+
tracer_and_exporter: init_tracer_and_exporter,
145+
always_create_workflow_spans: false,
146+
check_root: true,
147+
&
148+
)
142149
tracer, exporter = tracer_and_exporter
143150

144151
# Make client with interceptors
145-
interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(tracer)
152+
interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(
153+
tracer, always_create_workflow_spans:
154+
)
146155
new_options = env.client.options.with(interceptors: [interceptor])
147156
client = Temporalio::Client.new(**new_options.to_h) # steep:ignore
148157

@@ -153,13 +162,22 @@ def trace(tracer_and_exporter: init_tracer_and_exporter, &)
153162

154163
# Convert spans, confirm there is only the outer, and return children
155164
spans = ExpectedSpan.from_span_data(exporter.finished_spans)
156-
assert_equal 1, spans.size
157-
assert_equal 'root', spans.first&.name
165+
if check_root
166+
assert_equal 1, spans.size
167+
assert_equal 'root', spans.first&.name
168+
end
158169
spans.first
159170
end
160171

161-
def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &)
162-
trace(tracer_and_exporter:) do |client|
172+
def trace_workflow(
173+
scenario,
174+
tracer_and_exporter: init_tracer_and_exporter,
175+
start_with_untraced_client: false,
176+
always_create_workflow_spans: false,
177+
check_root: true,
178+
&
179+
)
180+
trace(tracer_and_exporter:, always_create_workflow_spans:, check_root:) do |client|
163181
# Must capture and attach outer context
164182
outer_context = OpenTelemetry::Context.current
165183
attach_token = nil
@@ -169,7 +187,8 @@ def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &)
169187
client:,
170188
activities: [TestActivity.new(tracer_and_exporter.first)],
171189
# Have to reattach outer context inside worker run to check outer span
172-
on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) }
190+
on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) },
191+
start_workflow_client: start_with_untraced_client ? env.client : client
173192
) do |handle|
174193
yield handle
175194
ensure
@@ -398,42 +417,61 @@ def test_client_fail
398417
end
399418

400419
def test_child_and_external
401-
exp_root = ExpectedSpan.new(name: 'root')
402-
act_root = trace_workflow(:wait_on_signal) do |handle|
403-
exp_cl_attrs = { 'temporalWorkflowID' => handle.id }
404-
exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id })
405-
exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs)
406-
exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs)
407-
408-
# Wait for task completion so update isn't accidentally first before run
409-
assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) }
410-
411-
# Update calls child and sends signals to it in two ways
412-
child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update,
413-
:child_workflow, id: 'my-update-id')
414-
exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update',
415-
attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }))
416-
# Expected span for update
417-
exp_hnd_update = exp_start_wf.add_child(
418-
name: 'HandleUpdate:update',
419-
attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }),
420-
links: [exp_update]
421-
)
422-
# Expected for children
423-
exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id }
424-
exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs)
425-
exp_child_start
426-
.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
427-
.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
428-
# Two signals we send to the child
429-
exp_sig_child = exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs)
430-
exp_sig_ext = exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs)
431-
exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_child])
432-
exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_ext])
433-
434-
assert_equal 'workflow-done', child_result
420+
# We have to test child signal and external signal separately because sending both back-to-back can result in
421+
# rare cases where one is delivered before the other (yes, even if you wait on the first to get an initiated
422+
# event)
423+
%i[child_workflow_child_signal child_workflow_external_signal].each do |scenario|
424+
exp_root = ExpectedSpan.new(name: 'root')
425+
act_root = trace_workflow(:wait_on_signal) do |handle|
426+
exp_cl_attrs = { 'temporalWorkflowID' => handle.id }
427+
exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id })
428+
exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs)
429+
exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs)
430+
431+
# Wait for task completion so update isn't accidentally first before run
432+
assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) }
433+
434+
# Update calls child and sends signals to it in two ways
435+
child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update,
436+
scenario, id: 'my-update-id')
437+
exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update',
438+
attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }))
439+
# Expected span for update
440+
exp_hnd_update = exp_start_wf.add_child(
441+
name: 'HandleUpdate:update',
442+
attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }),
443+
links: [exp_update]
444+
)
445+
# Expected for children
446+
exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id }
447+
exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs)
448+
exp_child_start
449+
.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
450+
.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs)
451+
452+
# There are cases where signal comes _before_ start and cases where signal _comes_ after and server gives us
453+
# no way of knowing that a child _actually_ began running, so we check whether task completed comes before
454+
# signal
455+
assert_equal 'workflow-done', child_result
456+
child_events = env.client.workflow_handle(child_id.to_s).fetch_history_events.to_a
457+
signal_comes_first = child_events.index(&:workflow_execution_signaled_event_attributes).to_i <
458+
child_events.index(&:workflow_task_completed_event_attributes).to_i
459+
# Signal we send to the child
460+
exp_sig = if scenario == :child_workflow_child_signal
461+
exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs)
462+
else
463+
exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs)
464+
end
465+
exp_child_start.add_child(
466+
name: 'HandleSignal:signal',
467+
attributes: exp_child_run_attrs,
468+
links: [exp_sig],
469+
insert_at: signal_comes_first ? 0 : 1
470+
)
471+
end
472+
assert_equal exp_root.to_s_indented, act_root.to_s_indented,
473+
"Expected:\n#{exp_root.to_s_indented}\nActual:#{act_root.to_s_indented}"
435474
end
436-
assert_equal exp_root.to_s_indented, act_root.to_s_indented
437475
end
438476

439477
def test_continue_as_new
@@ -458,6 +496,29 @@ def test_continue_as_new
458496
assert_equal exp_root.to_s_indented, act_root.to_s_indented
459497
end
460498

499+
def test_always_create_workflow_spans
500+
# Untraced client has no spans by default
501+
act = trace_workflow(:complete, start_with_untraced_client: true, check_root: false) do |handle|
502+
assert_equal 'workflow-done', handle.result
503+
end
504+
assert_empty act.children
505+
506+
# Untraced client has no spans by default
507+
exp_root = ExpectedSpan.new(name: 'root')
508+
act = trace_workflow(
509+
:complete,
510+
start_with_untraced_client: true,
511+
always_create_workflow_spans: true,
512+
check_root: false
513+
) do |handle|
514+
exp_attrs = { 'temporalWorkflowID' => handle.id, 'temporalRunID' => handle.result_run_id }
515+
exp_run_wf = exp_root.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_attrs)
516+
exp_run_wf.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_attrs)
517+
assert_equal 'workflow-done', handle.result
518+
end
519+
assert_equal exp_root.children.first&.to_s_indented, act.to_s_indented
520+
end
521+
461522
ExpectedSpan = Data.define(:name, :children, :attributes, :links, :exception_message) # rubocop:disable Layout/ClassStructure
462523

463524
class ExpectedSpan
@@ -493,13 +554,16 @@ def self.from_span_data(all_spans)
493554
end
494555

495556
def initialize(name:, children: [], attributes: {}, links: [], exception_message: nil)
496-
children = children.to_set
497557
super
498558
end
499559

500-
def add_child(name:, attributes: {}, links: [], exception_message: nil)
560+
def add_child(name:, attributes: {}, links: [], exception_message: nil, insert_at: nil)
501561
span = ExpectedSpan.new(name:, attributes:, links:, exception_message:)
502-
children << span
562+
if insert_at.nil?
563+
children << span
564+
else
565+
children.insert(insert_at, span)
566+
end
503567
span
504568
end
505569

temporalio/test/sig/contrib/open_telemetry_test.rbs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,16 @@ module Contrib
22
class OpenTelemetryTest < Test
33
def init_tracer_and_exporter: -> [untyped, untyped]
44
def trace: (
5-
?tracer_and_exporter: [untyped, untyped]
5+
?tracer_and_exporter: [untyped, untyped],
6+
?always_create_workflow_spans: bool,
7+
?check_root: bool
68
) { (Temporalio::Client) -> void } -> untyped
79
def trace_workflow: (
810
Symbol scenario,
9-
?tracer_and_exporter: [untyped, untyped]
11+
?tracer_and_exporter: [untyped, untyped],
12+
?start_with_untraced_client: bool,
13+
?always_create_workflow_spans: bool,
14+
?check_root: bool
1015
) { (Temporalio::Client::WorkflowHandle) -> void } -> untyped
1116

1217
class ExpectedSpan
@@ -30,7 +35,8 @@ module Contrib
3035
name: String,
3136
?attributes: Hash[untyped, untyped],
3237
?links: Array[ExpectedSpan],
33-
?exception_message: String?
38+
?exception_message: String?,
39+
?insert_at: Integer?
3440
) -> ExpectedSpan
3541

3642
def to_s_indented: (?indent: String) -> String

temporalio/test/sig/workflow_utils.rbs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ module WorkflowUtils
1717
?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum,
1818
?max_heartbeat_throttle_interval: Float,
1919
?task_timeout: duration?,
20-
?on_worker_run: Proc?
20+
?on_worker_run: Proc?,
21+
?start_workflow_client: Temporalio::Client
2122
) -> Object? |
2223
[T] (
2324
singleton(Temporalio::Workflow::Definition) workflow,
@@ -37,7 +38,8 @@ module WorkflowUtils
3738
?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum,
3839
?max_heartbeat_throttle_interval: Float,
3940
?task_timeout: duration?,
40-
?on_worker_run: Proc?
41+
?on_worker_run: Proc?,
42+
?start_workflow_client: Temporalio::Client
4143
) { (Temporalio::Client::WorkflowHandle, Temporalio::Worker) -> T } -> T
4244

4345
def assert_eventually_task_fail: (

temporalio/test/workflow_utils.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ def execute_workflow(
2929
max_heartbeat_throttle_interval: 60.0,
3030
task_timeout: nil,
3131
interceptors: [],
32-
on_worker_run: nil
32+
on_worker_run: nil,
33+
start_workflow_client: client
3334
)
3435
worker = Temporalio::Worker.new(
3536
client:,
@@ -45,7 +46,7 @@ def execute_workflow(
4546
)
4647
worker.run do
4748
on_worker_run&.call
48-
handle = client.start_workflow(
49+
handle = start_workflow_client.start_workflow(
4950
workflow,
5051
*args,
5152
id:,

0 commit comments

Comments
 (0)