@@ -472,9 +472,10 @@ def _finalize_and_register_atom(
472
472
callsite_deps : list [str ],
473
473
call_expr : ast .AST | list [ast .stmt ],
474
474
* ,
475
- return_target : str | list [str ] | tuple [str , ...] | ast .expr | None = None
475
+ return_target : str | list [str ] | tuple [str , ...] | ast .expr | None = None ,
476
+ callsite_node : ast .AST | None = None ,
476
477
) -> ast .FunctionDef :
477
- func = self ._build_atom_function (atom_name , component_id , callsite_deps , call_expr , return_target = return_target )
478
+ func = self ._build_atom_function (atom_name , component_id , callsite_deps , call_expr , return_target = return_target , callsite_node = callsite_node )
478
479
self ._finalize_atom_deps (func )
479
480
self ._current_frame .generated_atoms .append (func )
480
481
return func
@@ -647,7 +648,8 @@ def visit_Name(self, node): # noqa: N802
647
648
component_id ,
648
649
callsite_deps ,
649
650
assign_stmt ,
650
- return_target = ast .Name (id = target .id , ctx = ast .Load ())
651
+ return_target = ast .Name (id = target .id , ctx = ast .Load ()),
652
+ callsite_node = stmt
651
653
)
652
654
653
655
def _lift_output_stream_stmt (self , stmt : ast .Expr , component_id : str , atom_name : str , stream : str ) -> None :
@@ -690,7 +692,7 @@ def _lift_output_stream_stmt(self, stmt: ast.Expr, component_id: str, atom_name:
690
692
691
693
try :
692
694
call_and_return = ast .parse (source ).body
693
- self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , call_and_return )
695
+ self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , call_and_return , callsite_node = stmt )
694
696
except SyntaxError as e :
695
697
self ._safe_register_error (
696
698
lineno = self ._get_stable_lineno (stmt , "output stream code generation" ),
@@ -743,7 +745,7 @@ def _lift_return_renderable_call(
743
745
]
744
746
)
745
747
746
- self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , wrapped_call )
748
+ self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , wrapped_call , callsite_node = stmt )
747
749
748
750
def _lift_side_effect_stmt (self , stmt : ast .Expr ) -> None :
749
751
"""
@@ -821,7 +823,7 @@ def visit_Name(self, node: ast.Name): # noqa: N802
821
823
822
824
component_id , atom_name = self .generate_component_and_atom_name ("sideeffect" , stmt )
823
825
atom_body = [ast .Expr (value = patched_call )]
824
- self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , atom_body )
826
+ self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , atom_body , callsite_node = stmt )
825
827
logger .debug ('[AST] lifted side effect statement into atom %s -> %s' , component_id , atom_name )
826
828
827
829
except Exception as e :
@@ -838,7 +840,6 @@ def _lift_blackbox_function_call(
838
840
scoped_map : dict [str , str ],
839
841
variable_map : dict [str , str ],
840
842
) -> None :
841
- logger .debug ('[DEBUG] enter _lift_blackbox_function_call' )
842
843
843
844
component_id , atom_name = self .generate_component_and_atom_name (func_name , stmt )
844
845
@@ -934,7 +935,8 @@ def _lift_blackbox_function_call(
934
935
component_id ,
935
936
callsite_deps ,
936
937
body ,
937
- return_target = return_target
938
+ return_target = return_target ,
939
+ callsite_node = stmt
938
940
)
939
941
940
942
def _lift_producer_stmt (self , stmt : ast .Assign , pending_assignments : list [ast .Assign ], variable_map : dict [str , str ]) -> None :
@@ -973,7 +975,7 @@ def _lift_producer_stmt(self, stmt: ast.Assign, pending_assignments: list[ast.As
973
975
974
976
return_target : str | list [str ] | None = None
975
977
if isinstance (stmt .targets [0 ], ast .Tuple | ast .List ):
976
- component_id , atom_name = self .generate_component_and_atom_name ("producer" )
978
+ component_id , atom_name = self .generate_component_and_atom_name ("producer" , stmt )
977
979
self ._current_frame .tuple_returning_atoms .add (atom_name )
978
980
979
981
unpacked_vars = [
@@ -1095,7 +1097,8 @@ def _lift_producer_stmt(self, stmt: ast.Assign, pending_assignments: list[ast.As
1095
1097
component_id ,
1096
1098
deps ,
1097
1099
[patched_stmt ],
1098
- return_target = return_target
1100
+ return_target = return_target ,
1101
+ callsite_node = stmt
1099
1102
)
1100
1103
logger .info ("[AST] lifted subscript assignment into atom %s -> %s" , component_id , new_atom_name )
1101
1104
return
@@ -1135,7 +1138,7 @@ def _lift_producer_stmt(self, stmt: ast.Assign, pending_assignments: list[ast.As
1135
1138
self ._register_variable_bindings (stmt , atom_name )
1136
1139
logger .debug (f"[AST] Lifted producer: { atom_name = } { callsite_deps = } " )
1137
1140
1138
- self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , patched_expr , return_target = return_target )
1141
+ self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , patched_expr , return_target = return_target , callsite_node = stmt )
1139
1142
self ._current_frame .variable_to_atom .update (variable_map )
1140
1143
1141
1144
def _lift_consumer_stmt (self , stmt : ast .Expr , * , component_id : str | None = None , atom_name : str | None = None ) -> ast .Expr :
@@ -1215,7 +1218,7 @@ def visit_Name(self, node: ast.Name): # noqa: N802
1215
1218
patched_expr = TupleAwareReplacer ().visit (copy .deepcopy (expr ))
1216
1219
ast .fix_missing_locations (patched_expr )
1217
1220
1218
- self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , patched_expr )
1221
+ self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , patched_expr , callsite_node = stmt )
1219
1222
1220
1223
# Return the rewritten expression as a call to the generated atom
1221
1224
callsite = self ._make_callsite (atom_name , callsite_deps )
@@ -1330,7 +1333,7 @@ def _try_lift_display_renderer(
1330
1333
],
1331
1334
)
1332
1335
1333
- self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , renderer_call )
1336
+ self ._finalize_and_register_atom (atom_name , component_id , callsite_deps , renderer_call , callsite_node = stmt )
1334
1337
1335
1338
#logger.debug(f"[DEBUG] Replacing .show call with call to: {renderer_fn.__name__}({object_arg=}, {component_id=})")
1336
1339
@@ -1677,7 +1680,7 @@ def resolve_attribute(node: ast.AST) -> str | None:
1677
1680
if logger .isEnabledFor (logging .DEBUG ):
1678
1681
logger .debug (f"[AST] Unable to resolve function name for call: { ast .dump (call )} " )
1679
1682
else :
1680
- logger .warning (f "[AST] Unable to resolve function name for call. Enable debug logging for ast dump." )
1683
+ logger .warning ("[AST] Unable to resolve function name for call. Enable debug logging for ast dump." )
1681
1684
return "<unknown>"
1682
1685
1683
1686
def _has_runtime_execution (self , body : list [ast .stmt ]) -> bool :
@@ -2163,7 +2166,7 @@ def visit_JoinedStr(self, node: ast.JoinedStr) -> ast.JoinedStr: # noqa: N802
2163
2166
self ._current_frame .tuple_variable_index ,
2164
2167
).visit (call )
2165
2168
2166
- def visit_Assign (self , node : ast .Assign ) -> ast .AST :
2169
+ def visit_Assign (self , node : ast .Assign ) -> ast .AST : # noqa: N802
2167
2170
# Only support simple single target assignments for now
2168
2171
if len (node .targets ) == 1 and isinstance (node .targets [0 ], ast .Name ):
2169
2172
varname = node .targets [0 ].id
@@ -2265,9 +2268,10 @@ def visit_FunctionDef(self, node: ast.FunctionDef) -> ast.FunctionDef: # noqa: N
2265
2268
return node
2266
2269
2267
2270
# Attach atom decorator
2268
- callsite_hint = f"{ self .filename } :{ getattr (node , 'lineno' , 0 )} "
2269
- atom_name = generate_stable_id ("_auto_atom" , callsite_hint = callsite_hint )
2270
- decorator = self ._create_workflow_atom_decorator (atom_name , callsite_deps = [])
2271
+ callsite_metadata = self ._build_callsite_metadata (node , self .filename )
2272
+ atom_name = generate_stable_id ("_auto_atom" , callsite_hint = callsite_metadata ["callsite_hint" ])
2273
+ decorator = self ._create_workflow_atom_decorator (atom_name , callsite_deps = [], callsite_metadata = callsite_metadata )
2274
+
2271
2275
node .decorator_list .insert (0 , decorator )
2272
2276
node .generated_atom_name = atom_name
2273
2277
self .atoms .append (atom_name )
@@ -2441,47 +2445,51 @@ def _build_atom_function(
2441
2445
callsite_deps : list [str ],
2442
2446
call_expr : ast .AST | list [ast .stmt ],
2443
2447
* ,
2444
- return_target : str | list [str ] | tuple [str , ...] | ast .expr | None = None
2448
+ return_target : str | list [str ] | tuple [str , ...] | ast .expr | None = None ,
2449
+ callsite_node : ast .AST | None = None ,
2445
2450
) -> ast .FunctionDef :
2446
2451
"""
2447
2452
Constructs a reactive atom function from a lifted expression or component call.
2448
2453
2449
2454
The generated function will:
2450
2455
- Accept `param0`, `param1`, ... as arguments for each reactive dependency
2451
- - Wrap the user expression(s) in a function body
2452
- - Return the computed value (or specified `return_target`)
2453
- - Be decorated with ` @workflow.atom(name= ..., dependencies=[...])`
2456
+ - Wrap the normalized expression(s) in a function body
2457
+ - Return the computed value, using `return_target` if provided, or appending a default return otherwise
2458
+ - Attach the @workflow.atom(...) decorator with name , dependencies, and callsite metadata
2454
2459
2455
2460
Supports:
2456
2461
- Named assignments: `x = ...`
2457
2462
- Subscript assignments: `param0["col"] = ...`
2458
2463
- Blocks of statements: `list[ast.stmt]`
2459
- - Raw expressions: a value to directly `return `
2464
+ - Single expressions wrapped as `ast.Expr `
2460
2465
- Explicit return override via `return_target`
2466
+
2467
+ Any other AST node types passed as `call_expr` will result in a safe transformation error.
2461
2468
"""
2462
2469
2463
2470
# Create function parameters: (param0, param1, ...)
2464
2471
args_ast = self ._make_param_args (callsite_deps )
2465
2472
2466
- # Build decorator
2467
- decorator = self ._create_workflow_atom_decorator (atom_name , callsite_deps )
2468
-
2469
- # Build function body
2473
+ # Normalize call_expr into a body list
2470
2474
if isinstance (call_expr , list ):
2471
2475
body = call_expr
2472
- elif isinstance (call_expr , ast .Assign ):
2473
- body = [call_expr ]
2474
- elif isinstance (call_expr , ast .Expr ):
2476
+ elif isinstance (call_expr , ast .Assign ) or isinstance (call_expr , ast .Expr ):
2475
2477
body = [call_expr ]
2476
2478
else :
2477
- # e.g. raw expression to return directly
2478
- return_stmt = ast .Return (value = call_expr )
2479
- return ast .FunctionDef (
2480
- name = atom_name ,
2481
- args = args_ast ,
2482
- body = [return_stmt ],
2483
- decorator_list = [decorator ],
2479
+ self ._safe_register_error (
2480
+ node = call_expr ,
2481
+ message = f"Unexpected AST node type in _build_atom_function: { type (call_expr ).__name__ } " ,
2482
+ component_id = component_id ,
2483
+ atom_name = atom_name ,
2484
2484
)
2485
+ return None
2486
+
2487
+ callsite_metadata = self ._build_callsite_metadata (callsite_node , self .filename )
2488
+ decorator = self ._create_workflow_atom_decorator (
2489
+ atom_name ,
2490
+ callsite_deps ,
2491
+ callsite_metadata = callsite_metadata
2492
+ )
2485
2493
2486
2494
# Append appropriate return statement
2487
2495
if isinstance (return_target , str ):
@@ -2512,16 +2520,24 @@ def _build_atom_function(
2512
2520
decorator_list = [decorator ],
2513
2521
)
2514
2522
2515
- def _create_workflow_atom_decorator (self , atom_name : str , callsite_deps : list [str ]) -> ast .Call :
2523
+ def _create_workflow_atom_decorator (self , atom_name : str , callsite_deps : list [str ], callsite_metadata : dict | None = None ) -> ast .Call :
2516
2524
"""
2517
2525
Constructs a decorator expression for @workflow.atom(...).
2518
2526
2527
+ Includes metadata such as the atom's name, its dependency list, and optionally,
2528
+ source level callsite information (filename, line number, and source line).
2529
+
2530
+ The `callsite_metadata` is propagated into the atom definition to enable more
2531
+ precise runtime error handling, allowing the workflow engine to report
2532
+ meaningful context when atom execution fails.
2533
+
2519
2534
Args:
2520
2535
atom_name: The name to assign to the reactive atom.
2521
2536
callsite_deps: A list of atom names this atom depends on.
2537
+ callsite_metadata: Optional dictionary with source information, such as filename, lineno, source
2522
2538
2523
2539
Returns:
2524
- An `ast.Call` node representing the decorator.
2540
+ An `ast.Call` node representing the fully parameterized decorator.
2525
2541
"""
2526
2542
2527
2543
keywords = [ast .keyword (arg = "name" , value = ast .Constant (value = atom_name ))]
@@ -2538,6 +2554,17 @@ def _create_workflow_atom_decorator(self, atom_name: str, callsite_deps: list[st
2538
2554
)
2539
2555
)
2540
2556
2557
+ if callsite_metadata is not None :
2558
+ keywords .append (
2559
+ ast .keyword (
2560
+ arg = "callsite_metadata" ,
2561
+ value = ast .Dict (
2562
+ keys = [ast .Constant (value = k ) for k in callsite_metadata ],
2563
+ values = [ast .Constant (value = v ) for v in callsite_metadata .values ()],
2564
+ ),
2565
+ )
2566
+ )
2567
+
2541
2568
return ast .Call (
2542
2569
func = ast .Attribute (value = ast .Name (id = "workflow" , ctx = ast .Load ()), attr = "atom" , ctx = ast .Load ()),
2543
2570
args = [],
@@ -2583,7 +2610,7 @@ def lift_component_call_to_atom(self, node: ast.Call, component_id: str, atom_na
2583
2610
patched_call = self ._patch_callsite (node , callsite_deps , component_id )
2584
2611
2585
2612
# Generate the atom function that wraps the patched call
2586
- new_func = self ._build_atom_function (atom_name , component_id , callsite_deps , patched_call , return_target = return_target )
2613
+ new_func = self ._build_atom_function (atom_name , component_id , callsite_deps , patched_call , return_target = return_target , callsite_node = node )
2587
2614
2588
2615
self ._current_frame .generated_atoms .append (new_func )
2589
2616
@@ -2657,6 +2684,31 @@ def generate_component_and_atom_name(self, func_name: str, stmt: ast.stmt | None
2657
2684
logger .debug (f"[AST] Generated names { func_name = } { callsite_hint = } { component_id = } { atom_name = } " )
2658
2685
return component_id , atom_name
2659
2686
2687
+ def _build_callsite_metadata (self , node : ast .AST , filename : str ) -> dict :
2688
+ """
2689
+ Constructs callsite metadata (filename, lineno, source) for a given AST node.
2690
+
2691
+ Returns:
2692
+ A dict with keys: callsite_filename, callsite_lineno, callsite_source
2693
+ """
2694
+ lineno = getattr (node , "lineno" , None )
2695
+ source = ""
2696
+
2697
+ if filename and lineno :
2698
+ try :
2699
+ with open (filename , "r" ) as f :
2700
+ lines = f .readlines ()
2701
+ source = lines [lineno - 1 ].strip () if 0 < lineno <= len (lines ) else ""
2702
+ except Exception :
2703
+ pass
2704
+
2705
+ return {
2706
+ "callsite_filename" : filename ,
2707
+ "callsite_lineno" : lineno ,
2708
+ "callsite_source" : source ,
2709
+ "callsite_hint" : f"{ filename } :{ lineno } " if filename and lineno else None ,
2710
+ }
2711
+
2660
2712
2661
2713
def annotate_parents (tree : ast .AST ) -> ast .AST :
2662
2714
"""
0 commit comments