-
Notifications
You must be signed in to change notification settings - Fork 133
Avoid token detaching from different context #1153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…was used to attach it
…helpers to test/helpers
ctx = contextvars.copy_context() | ||
span = ctx.run( | ||
self._create_span, | ||
name="temporal:startActivity", | ||
data={"activity": input.activity}, | ||
input=input, | ||
) | ||
handle = ctx.run(self.next.start_activity, input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, adding more stack trace depth and layers of indirection for the invocation of activities is a bit rough I think. Maybe I don't understand the problem well enough. So there was an issue with span.finish
too, not just .detach
? Without this change here (not discussing other changes later), can we cause a failure (even if not specifically the test in this PR)? What does the failure look like? I was under the impression it was OTel detach, not span.finish. Is OpenAI+OTel span finish somehow doing detach?
Also, assuming this can fail, I think we should be able to run the finish in the copied context but not have to run user code in it because the current context is good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can create a failure using things from #1136. OpenAIAgentsInstrumentor().instrument()
but it would be a lot to try to include as a test. I'm actually leaning towards a different solution here where we simply close the startActivity span when the handle is created. This is similar to what we do with Otel, and I don't think parenting the activity execution to this really does much. (It will still be parented to the workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here is the same underlying issue with our Otel detach. In the OpenAIAgentsInstrumentor
it manages it's own tokens and when/if the Temporal SDK executes span.finish in a different context their detach will raise the error.
Similarly, if we don't run the user code in the same context, I don't expect the span hierarchy to stay intact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The detach thing made sense because it was in a finally
, but this is done via add_done_callback
. Are we saying that add_done_callback
is invoked on GC/GeneratorExit like finally
is? I see now there is a context
parameter for add_done_callback
and it defaults in all Python versions we support, so now I think that add_done_callback
does run in the current context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_done_callback
should execute the callable when the the Future
is completed, which should include the GC/GeneratorExit scenario. I didn't see the context param before which we could definitely use here. Looks like if the context param is not provided, it captures existing context when calling add_done_callback
and uses that.
temporalio/contrib/opentelemetry.py
Outdated
with self._top_level_workflow_context(success_is_complete=True) as ctx: | ||
if sys.version_info >= (3, 11): | ||
return await asyncio.create_task( | ||
self._execute_workflow(input), context=ctx | ||
) | ||
else: | ||
return await ctx.run(asyncio.create_task, self._execute_workflow(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, I am not sure another layer of indirection and stack trace is good here, and am especially scared of a whole new asyncio task. Are there any other possible solutions that will allow us to not create tasks? Can we just alter the finally
to ensure it is peformed in the context somehow? Maybe a less signficant change kind of back to what we had but specifically address just the one line that was the issue, detach
?
(I admit I may not understand the technical details of some of this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an option Alex explored where we specifically check if the context has changed and just not detach. We wanted to explore what it would look like to actually make the context consistent across the whole thing. @VegetarianOrc did we ever find a way to replicate with safe eviction on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any other possible solutions that will allow us to not create tasks?
As Tim mentioned, the other solution that I explored was to just detect if the finally is running in a different task and simply not detach in that case. The routes for this I tested out were comparing thread ids, comparing the current otel context, and comparing the tasks via asyncio.current_task()
.
did we ever find a way to replicate with safe eviction on?
I haven't been able to reproduce it with safe eviction on, but I do generally feel like it's a good idea to have the interceptor guard against this possibility.
Can we just alter the finally to ensure it is performed in the context somehow?
The issue is that user code must execute in the same context that the attach happens in otherwise the values inserted (tracing headers, etc) aren't available so spans aren't linked or parented correctly. In order to run any coroutine with a different context you have to create a new task that takes that context otherwise the coroutine will just copy the current context the moment it's awaited. So in this case ctx.run(self._execute_workflow, input)
has the correct context when creating the coroutine object, but then just returns the coro object, which then inherits the current context when awaited.
If we want to avoid the task, I think the check to ensure that detach won't raise is a reasonable alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As Tim mentioned, the other solution that I explored was to just detect if the finally is running in a different task and simply not detach in that case. The routes for this I tested out were comparing thread ids, comparing the current otel context, and comparing the tasks via asyncio.current_task().
If you can replicate in tests, swallowing the detach failure I think is acceptable too. Regardless, I don't think detach must run in a case where we're already GC'ing here. So really, "detach" just needs to be best effort, not affect the rest of the code.
The issue is that user code must execute in the same context that the attach happens in otherwise the values inserted (tracing headers, etc) aren't available so spans aren't linked or parented correctly
Which it is already the case today (it affects the current context), this is only an issue with detach
Also, I bet you can just ctx.run
the detach without ctx.run/create_task'ing everything. I think we should limit the changes to just address the detach part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I bet you can just ctx.run the detach without ctx.run/create_task'ing everything.
I couldn't find a way to store a reference to the current contextvars.Context
to use just on the detach. Without that, the alternative is to copy the current context and ensure that attach, execute, and detach all run in that copied context.
So really, "detach" just needs to be best effort, not affect the rest of the code.
...
I think we should limit the changes to just address the detach part.
Fully on board with this approach as well. It makes sense to me that in the case that the context we ran in is no longer valid, we just don't detach as it's already gone. Just thought it was worth exploring what it looked like to ensure everything was executed in a context where we controlled the lifetime. @tconley1428, does the best effort detach approach make sense to you too? It's certainly much simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find a way to store a reference to the current contextvars.Context to use just on the detach. Without that, the alternative is to copy the current context and ensure that attach, execute, and detach all run in that copied context.
copy_context
sounds more expensive than it is IIUC. copy_context
is more copy-on-write-as-of-when-called, so effectively it's like a super cheap context snapshot/clone IIUC.
But yeah, swallowing best-effort detach failure I think is best, because detach is just about detaching from context (which is going away anyways because this task/generator is being GC'd), it isn't something like a "flush" IIUC where the absence of such a call could mean data loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine in this scenario.
…ot just equal but is the same object
# different contextvars.Context than the one the token was created | ||
# on. As such we do a best effort detach to avoid using a mismatched | ||
# token. | ||
if context is opentelemetry.context.get_current(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
tests/worker/test_workflow.py
Outdated
unpause_and_assert, | ||
workflow_update_exists, | ||
) | ||
from tests.helpers.cache_evitction import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misspelling in the filename
What was changed
The OpenTelemetry and OpenAI Agents interceptors have been updated to avoid detaching context in such a way that does not cause a
<Token> was created in a different Context
error.Why?
There are some situations where cleanup happens in a different
asyncio.Task
(when tasks get garbage collected for instance). When those situations happen thecontextvars.Context
used to create thecontextvars.Token
is no longer available. By confirming that the Otel context is the same as the one we attached, we can do a best effort detach and avoid the error.Checklist
Issues Closed
How was this tested:
A new test was added to force a GeneratorExit error after a task is garbage collected by disabling safe eviction. The test captures OpenTelemetry logs and ensures that the
Failed to detach context
message is not printed.