Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions camel/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import abc
import copy
import json
import os
import re
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -122,6 +124,14 @@ def __init__(
)
self._log_dir = os.environ.get("CAMEL_LOG_DIR", "camel_logs")

# Snapshot cleaning configuration
self._snapshot_cleanup_trigger = int(
os.environ.get("CAMEL_SNAPSHOT_CLEANUP_TRIGGER", "3")
)
self._snapshot_keep_recent = int(
os.environ.get("CAMEL_SNAPSHOT_KEEP_RECENT", "1")
)

@property
@abstractmethod
def token_counter(self) -> BaseTokenCounter:
Expand Down Expand Up @@ -258,6 +268,146 @@ def preprocess_messages(

return formatted_messages

def _count_snapshots(self, messages: List[OpenAIMessage]) -> int:
"""Count the number of messages containing snapshot key"""
count = 0
for msg in messages:
if msg.get('role') == 'tool' and msg.get('content'):
try:
content_str = msg['content']
if (
isinstance(content_str, str)
and "'snapshot':" in content_str
):
count += 1
continue

# Try to parse as JSON
if isinstance(content_str, str):
try:
content = json.loads(content_str)
except:
import ast

content = ast.literal_eval(content_str)
else:
content = content_str

if isinstance(content, dict) and 'snapshot' in content:
count += 1
logger.debug(
f"Found snapshot #{count} in message (parsed)"
)
except Exception as e:
logger.debug(f"Error parsing message content: {e}")
pass
logger.info(f"Total snapshot count: {count}")
return count

def _clean_old_snapshots(
self, messages: List[OpenAIMessage], keep_recent: int = 1
) -> List[OpenAIMessage]:
"""Clean old snapshots, keeping only the most recent ones"""
logger.info(
f"Starting snapshot cleanup with keep_recent={keep_recent}"
)
messages_copy = copy.deepcopy(messages)

# Find all messages with snapshots
snapshot_indices = []
for i, msg in enumerate(messages_copy):
if msg.get('role') == 'tool' and msg.get('content'):
try:
content_str = msg['content']

if (
isinstance(content_str, str)
and "'snapshot':" in content_str
):
snapshot_indices.append(i)
logger.debug(
f"Found snapshot "
f"at message index {i} (string search)"
)
continue

# Try to parse as JSON
if isinstance(content_str, str):
try:
content = json.loads(content_str)
except:
import ast

content = ast.literal_eval(content_str)
else:
content = content_str

if isinstance(content, dict) and 'snapshot' in content:
snapshot_indices.append(i)
logger.debug(
f"Found snapshot at message index {i} (parsed)"
)
except Exception as e:
logger.debug(f"Error checking message {i}: {e}")
pass

logger.info(f"Found {len(snapshot_indices)} snapshots in messages")

# Keep only the last 'keep_recent' snapshots
if len(snapshot_indices) > keep_recent:
indices_to_clean = snapshot_indices[:-keep_recent]
logger.info(
f"Will clean {len(indices_to_clean)} old snapshots"
f", keeping the last {keep_recent}"
)

for idx in indices_to_clean:
msg = messages_copy[idx]
try:
content_str = msg['content']

if isinstance(content_str, str):
# Try to parse the content
try:
content = json.loads(content_str)
is_json = True
except:
import ast

content = ast.literal_eval(content_str)
is_json = False

# Replace snapshot
content['snapshot'] = (
'snapshot history has been deleted'
)

# Convert back to string in the same format
if is_json:
msg['content'] = json.dumps(content)
else:
# Keep as Python dict string
msg['content'] = str(content)

logger.debug(f"Cleaned snapshot at index {idx}")
elif isinstance(msg['content'], dict):
msg['content']['snapshot'] = (
'snapshot history has been deleted'
)
logger.debug(f"Cleaned snapshot at index {idx}")
except Exception as e:
logger.error(
f"Failed to clean snapshot at index {idx}: {e}"
)
pass
else:
logger.info(
f"No cleaning needed only {len(snapshot_indices)} snapshots"
f"keep_recent is {keep_recent}"
)

return messages_copy

def _log_request(self, messages: List[OpenAIMessage]) -> Optional[str]:
r"""Log the request messages to a JSON file if logging is enabled.

Expand Down Expand Up @@ -410,6 +560,32 @@ def run(
`ChatCompletionStreamManager[BaseModel]` in the structured
stream mode.
"""
# Check if we should clean snapshots
logger.info(
f"Snapshot cleanup config: trigger={self._snapshot_cleanup_trigger}, keep_recent={self._snapshot_keep_recent}"
)
snapshot_count = self._count_snapshots(messages)

if snapshot_count > 0:
logger.info(
f"Checking if {snapshot_count} % {self._snapshot_cleanup_trigger} == 0: {snapshot_count % self._snapshot_cleanup_trigger == 0}"
)
if snapshot_count % self._snapshot_cleanup_trigger == 0:
logger.info(
f"Snapshot count ({snapshot_count}) is multiple of {self._snapshot_cleanup_trigger}, "
f"cleaning old snapshots..."
)
messages = self._clean_old_snapshots(
messages, keep_recent=self._snapshot_keep_recent
)
logger.info(
f"Cleaned snapshots, keeping only the {self._snapshot_keep_recent} most recent"
)
else:
logger.info(
f"No cleaning needed. {snapshot_count} is not a multiple of {self._snapshot_cleanup_trigger}"
)

# Log the request if logging is enabled
log_path = self._log_request(messages)

Expand Down Expand Up @@ -464,6 +640,32 @@ async def arun(
`AsyncChatCompletionStreamManager[BaseModel]` in the structured
stream mode.
"""
# Check if we should clean snapshots
logger.info(
f"Snapshot cleanup config: trigger={self._snapshot_cleanup_trigger}, keep_recent={self._snapshot_keep_recent}"
)
snapshot_count = self._count_snapshots(messages)

if snapshot_count > 0:
logger.info(
f"Checking if {snapshot_count} % {self._snapshot_cleanup_trigger} == 0: {snapshot_count % self._snapshot_cleanup_trigger == 0}"
)
if snapshot_count % self._snapshot_cleanup_trigger == 0:
logger.info(
f"Snapshot count ({snapshot_count}) is multiple of {self._snapshot_cleanup_trigger}, "
f"cleaning old snapshots..."
)
messages = self._clean_old_snapshots(
messages, keep_recent=self._snapshot_keep_recent
)
logger.info(
f"Cleaned snapshots, keeping only the {self._snapshot_keep_recent} most recent"
)
else:
logger.info(
f"No cleaning needed. {snapshot_count} is not a multiple of {self._snapshot_cleanup_trigger}"
)

# Log the request if logging is enabled
log_path = self._log_request(messages)

Expand Down
18 changes: 18 additions & 0 deletions camel/societies/workforce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@ class TaskResult(BaseModel):
)


class QualityEvaluation(BaseModel):
r"""Quality evaluation result for a completed task."""

quality_sufficient: bool = Field(
description="Whether the task result meets quality standards."
)
quality_score: int = Field(
description="Quality score from 0 to 100.", ge=0, le=100
)
issues: List[str] = Field(
default_factory=list,
description="List of quality issues found in the result.",
)
improvement_suggestion: Optional[str] = Field(
default=None, description="Specific suggestion to improve the result."
)


class TaskAssignment(BaseModel):
r"""An individual task assignment within a batch."""

Expand Down
Loading