Skip to content

Commit ab67893

Browse files
authored
Merge pull request #218 from raga-ai-hub/v2.1.7
v2.1.7.4
2 parents 14ac5a6 + 263502a commit ab67893

File tree

9 files changed

+317
-126
lines changed

9 files changed

+317
-126
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ description = "RAGA AI CATALYST"
88
readme = "README.md"
99
requires-python = ">=3.10,<=3.13.2"
1010
# license = {file = "LICENSE"}
11-
version = "2.1.6.4"
11+
version = "2.1.7.4"
1212
authors = [
1313
{name = "Kiran Scaria", email = "kiran.scaria@raga.ai"},
1414
{name = "Kedar Gaikwad", email = "kedar.gaikwad@raga.ai"},

ragaai_catalyst/tracers/agentic_tracing/upload/trace_uploader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def process_upload(task_id: str, filepath: str, hash_id: str, zip_path: str,
157157
)
158158
logger.info(f"Trace metrics uploaded: {response}")
159159
except Exception as e:
160-
logger.error(f"Error uploading trace metrics: {e}")
160+
logger.error(f"Error uploading trace trace uploader metrics: {e}")
161161
# Continue with other uploads
162162
else:
163163
logger.warning(f"Trace file {filepath} not found, skipping metrics upload")
@@ -364,4 +364,4 @@ def run_daemon():
364364
if args.daemon:
365365
logger.info("Daemon mode not needed in futures implementation")
366366
else:
367-
logger.info("Interactive mode not needed in futures implementation")
367+
logger.info("Interactive mode not needed in futures implementation")

ragaai_catalyst/tracers/exporters/ragaai_trace_exporter.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,15 @@ def process_complete_trace(self, spans, trace_id):
8989
except Exception as e:
9090
print(f"Error converting trace {trace_id}: {e}")
9191
return # Exit early if conversion fails
92-
92+
9393
# Check if trace details are None (conversion failed)
9494
if ragaai_trace_details is None:
9595
logger.error(f"Cannot upload trace {trace_id}: conversion failed and returned None")
9696
return # Exit early if conversion failed
9797

9898
# Upload the trace if upload_trace function is provided
9999
try:
100-
if self.post_processor!=None:
100+
if self.post_processor!=None and self.tracer_type != "langchain":
101101
ragaai_trace_details['trace_file_path'] = self.post_processor(ragaai_trace_details['trace_file_path'])
102102
if self.tracer_type == "langchain":
103103
# Check if we're already in an event loop
@@ -106,18 +106,18 @@ def process_complete_trace(self, spans, trace_id):
106106
if loop.is_running():
107107
# We're in a running event loop (like in Colab/Jupyter)
108108
# Create a future and run the coroutine
109-
future = asyncio.ensure_future(self.upload_rag_trace(ragaai_trace_details, additional_metadata, trace_id))
109+
future = asyncio.ensure_future(self.upload_rag_trace(ragaai_trace_details, additional_metadata, trace_id, self.post_processor))
110110
# We don't wait for it to complete as this would block the event loop
111111
logger.info(f"Scheduled async upload for trace {trace_id} in existing event loop")
112112
else:
113113
# No running event loop, use asyncio.run()
114-
asyncio.run(self.upload_rag_trace(ragaai_trace_details, additional_metadata, trace_id))
114+
asyncio.run(self.upload_rag_trace(ragaai_trace_details, additional_metadata, trace_id, self.post_processor))
115115
except RuntimeError:
116116
# No event loop exists, create one
117-
asyncio.run(self.upload_rag_trace(ragaai_trace_details, additional_metadata, trace_id))
117+
asyncio.run(self.upload_rag_trace(ragaai_trace_details, additional_metadata, trace_id, self.post_processor))
118118
else:
119119
self.upload_trace(ragaai_trace_details, trace_id)
120-
except Exception as e:
120+
except Exception as e:
121121
print(f"Error uploading trace {trace_id}: {e}")
122122

123123
def prepare_trace(self, spans, trace_id):
@@ -206,13 +206,16 @@ def upload_trace(self, ragaai_trace_details, trace_id):
206206

207207
logger.info(f"Submitted upload task with ID: {self.upload_task_id}")
208208

209-
async def upload_rag_trace(self, ragaai_trace, additional_metadata, trace_id):
209+
async def upload_rag_trace(self, ragaai_trace, additional_metadata, trace_id, post_processor=None):
210210
try:
211211
ragaai_trace[0]['external_id'] = self.external_id
212212
trace_file_path = os.path.join(self.tmp_dir, f"{trace_id}.json")
213213
with open(trace_file_path, 'w') as f:
214214
json.dump(ragaai_trace, f, indent=2)
215215
logger.info(f"Trace file saved at {trace_file_path}")
216+
if self.post_processor!=None:
217+
trace_file_path = self.post_processor(trace_file_path)
218+
logger.info(f"After post processing Trace file saved at {trace_file_path}")
216219

217220
# Create a ThreadPoolExecutor with max_workers=30
218221
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_upload_workers) as executor:
@@ -227,7 +230,7 @@ async def upload_rag_trace(self, ragaai_trace, additional_metadata, trace_id):
227230
base_url=self.base_url
228231
).upload_traces,
229232
additional_metadata_keys=additional_metadata
230-
)
233+
)
231234

232235
# Implement retry logic - attempt upload up to 3 times
233236
max_retries = 3

ragaai_catalyst/tracers/tracer.py

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,12 +427,20 @@ def file_post_processor(original_trace_json_path: os.PathLike) -> os.PathLike:
427427
with open(original_path, 'r') as f:
428428
data = json.load(f)
429429

430-
# Apply masking only to data['data']
431-
data['data'] = recursive_mask_values(data['data'])
432-
433-
# Create new filename with 'processed_' prefix in /var/tmp/
430+
# Apply masking only to data['data'] or in case of langchain rag apply on 'traces' field of each element
431+
if 'data' in data:
432+
data['data'] = recursive_mask_values(data['data'])
433+
elif isinstance(data,list):
434+
masked_traces = []
435+
for item in data:
436+
if isinstance(item, dict) and 'traces' in item:
437+
item['traces'] = recursive_mask_values(item['traces'])
438+
masked_traces.append(item)
439+
data = masked_traces
440+
# Create new filename with 'processed_' prefix
434441
new_filename = f"processed_{original_path.name}"
435-
final_trace_json_path = Path("/var/tmp") / new_filename
442+
dir_name, original_filename = os.path.split(original_trace_json_path)
443+
final_trace_json_path = Path(dir_name) / new_filename
436444

437445
# Write modified data to the new file
438446
with open(final_trace_json_path, 'w') as f:
@@ -538,7 +546,7 @@ def set_dataset_name(self, dataset_name):
538546

539547
def _improve_metadata(self, metadata, tracer_type):
540548
if metadata is None:
541-
metadata = {"metadata": {}}
549+
metadata = {}
542550
metadata.setdefault("log_source", f"{tracer_type}_tracer")
543551
metadata.setdefault("recorded_on", str(datetime.datetime.now()))
544552
return metadata
@@ -859,3 +867,30 @@ def add_context(self, context):
859867
self.user_context = context
860868
else:
861869
raise TypeError("context must be a string")
870+
871+
def add_metadata(self, metadata):
872+
"""
873+
Add metadata information to the trace. This method is only supported for 'langchain' and 'llamaindex' tracer types.
874+
875+
Args:
876+
metadata: Additional metadata information to be added to the trace. Can be a dictionary.
877+
878+
Raises:
879+
ValueError: If tracer_type is not 'langchain' or 'llamaindex'.
880+
"""
881+
if self.tracer_type not in ["langchain", "llamaindex"]:
882+
raise ValueError("add_metadata is only supported for 'langchain' and 'llamaindex' tracer types")
883+
884+
# Convert string metadata to string if needed
885+
user_details = self.user_details
886+
user_metadata = user_details["trace_user_detail"]["metadata"]
887+
if isinstance(metadata, dict):
888+
for key, value in metadata.items():
889+
if key in user_metadata:
890+
user_metadata[key] = value
891+
else:
892+
raise ValueError(f"Key '{key}' not found in metadata")
893+
self.dynamic_exporter.user_details = user_details
894+
self.metadata = user_metadata
895+
else:
896+
raise TypeError("metadata must be a dictionary")

ragaai_catalyst/tracers/upload_traces.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,23 @@ def _create_dataset_schema_with_trace(self, additional_metadata_keys=None, addit
3636
"model_name": {"columnType": "metadata"},
3737
"total_cost": {"columnType": "metadata", "dataType": "numerical"},
3838
"total_latency": {"columnType": "metadata", "dataType": "numerical"},
39+
"error": {"columnType": "metadata"}
3940
}
4041

4142
if additional_metadata_keys:
4243
for key in additional_metadata_keys:
4344
if key == "model_name":
4445
SCHEMA_MAPPING_NEW['response']["modelName"] = additional_metadata_keys[key]
46+
elif key == "error":
47+
pass
4548
else:
4649
SCHEMA_MAPPING_NEW[key] = {"columnType": key, "parentColumn": "response"}
4750

51+
if self.user_detail and self.user_detail["trace_user_detail"]["metadata"]:
52+
for key in self.user_detail["trace_user_detail"]["metadata"]:
53+
if key not in SCHEMA_MAPPING_NEW:
54+
SCHEMA_MAPPING_NEW[key] = {"columnType": "metadata"}
55+
4856
if additional_pipeline_keys:
4957
for key in additional_pipeline_keys:
5058
SCHEMA_MAPPING_NEW[key] = {"columnType": "pipeline"}

ragaai_catalyst/tracers/utils/convert_langchain_callbacks_output.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,5 @@ def convert_langchain_callbacks_output(result, project_name="", metadata="", pip
5858

5959
initial_struc[0]["traces"] = traces_data
6060

61+
initial_struc[0]["error"] = result["error"]
6162
return initial_struc

0 commit comments

Comments
 (0)