1
+ from functools import wraps
2
+ from typing import Optional , Union , AsyncGenerator
3
+ import time
4
+
5
+ from vllm .entrypoints .openai .protocol import CompletionRequest , ErrorResponse
6
+ from vllm .tracing import extract_trace_context , SpanAttributes , init_tracer
7
+ from vllm .v1 .request import Request
8
+ from opentelemetry import trace
9
+
10
+ tracer = init_tracer (
11
+ "vllm.entrypoints.openai.serving_completion" ,
12
+ "http://localhost:4317" )
13
+
14
+ def trace_streaming_completion (tracer_attr = 'tracer' ):
15
+ """
16
+ Decorator specifically for tracing streaming completion functions.
17
+ Handles both the initial processing and the async generator.
18
+ """
19
+
20
+ def decorator (func ):
21
+ async def wrapper (self , request : CompletionRequest , raw_request : Request | None = None ):
22
+ ctx = extract_trace_context (dict (raw_request .headers )) if raw_request else None
23
+ parent_span = tracer .start_span ("chunkwise_beam_completion" , context = ctx )
24
+
25
+ # keep the span current until we’re done
26
+ scope = trace .use_span (parent_span , end_on_exit = False )
27
+
28
+ try :
29
+ parent_span .set_attribute (SpanAttributes .GEN_AI_REQUEST_MAX_TOKENS , request .max_tokens )
30
+ parent_span .set_attribute (SpanAttributes .GEN_AI_REQUEST_N , request .n )
31
+ if hasattr (request , "request_id" ):
32
+ parent_span .set_attribute (SpanAttributes .GEN_AI_REQUEST_ID , request .request_id )
33
+
34
+ gen = await func (self , request , raw_request )
35
+ if isinstance (gen , ErrorResponse ):
36
+ parent_span .end ()
37
+ scope .__exit__ (None , None , None )
38
+ return gen
39
+
40
+ async def traced_generator ():
41
+ with trace .use_span (parent_span , end_on_exit = False ):
42
+ with tracer .start_as_current_span ("chunk_generation" ):
43
+ async for item in gen :
44
+ yield item
45
+
46
+
47
+ # now it’s safe to close the parent
48
+ parent_span .end ()
49
+ scope .__exit__ (None , None , None )
50
+
51
+ return traced_generator ()
52
+
53
+ except Exception as e :
54
+ parent_span .record_exception (e )
55
+ parent_span .end ()
56
+ scope .__exit__ (type (e ), e , e .__traceback__ )
57
+ raise
58
+
59
+ return wrapper
60
+
61
+ return decorator
62
+
63
+
64
+ def trace_async_method (span_name : Optional [str ] = None , tracer_attr = 'tracer' ):
65
+ """
66
+ Simple decorator for tracing regular async methods.
67
+ """
68
+
69
+ def decorator (func ):
70
+ @wraps (func )
71
+ async def wrapper (self , * args , ** kwargs ):
72
+ name = span_name or func .__name__
73
+
74
+ with tracer .start_as_current_span (name ) as span :
75
+ start_time = time .time ()
76
+ try :
77
+ result = await func (self , * args , ** kwargs )
78
+ span .set_attribute ("execution_time_ms" , (time .time () - start_time ) * 1000 )
79
+ return result
80
+ except Exception as e :
81
+ span .record_exception (e )
82
+ raise
83
+
84
+ return wrapper
85
+
86
+ return decorator
0 commit comments