Skip to content

Commit 61fdd96

Browse files
feat: Improve agent pool efficiency and add performance metrics (#3034)
Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com>
1 parent c8f7ca7 commit 61fdd96

File tree

2 files changed

+137
-44
lines changed

2 files changed

+137
-44
lines changed

camel/societies/workforce/single_agent_worker.py

Lines changed: 137 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import datetime
1818
import time
1919
from collections import deque
20-
from typing import Any, List, Optional
20+
from typing import Any, Dict, List, Optional
2121

2222
from colorama import Fore
2323

@@ -56,12 +56,19 @@ def __init__(
5656
initial_size: int = 1,
5757
max_size: int = 10,
5858
auto_scale: bool = True,
59-
idle_timeout: float = 180.0, # 3 minutes
59+
idle_timeout: float = 180.0,
60+
max_tasks_per_agent: int = 10,
61+
min_cleanup_interval: float = 15.0,
62+
max_cleanup_interval: float = 120.0,
6063
):
6164
self.base_agent = base_agent
6265
self.max_size = max_size
6366
self.auto_scale = auto_scale
6467
self.idle_timeout = idle_timeout
68+
self._max_tasks_per_agent = max_tasks_per_agent
69+
self.min_cleanup_interval = min_cleanup_interval
70+
self.max_cleanup_interval = max_cleanup_interval
71+
self._agent_metadata_pool: Dict[int, Dict[str, Any]] = {}
6572

6673
# Pool management
6774
self._available_agents: deque = deque()
@@ -73,6 +80,9 @@ def __init__(
7380
self._total_borrows = 0
7481
self._total_clones_created = 0
7582
self._pool_hits = 0
83+
self._agents_cleaned = 0
84+
self._tasks_waited = 0
85+
self._total_wait_time = 0.0
7686

7787
# Initialize pool
7888
self._initialize_pool(initial_size)
@@ -86,61 +96,108 @@ def _initialize_pool(self, size: int) -> None:
8696
def _create_fresh_agent(self) -> ChatAgent:
8797
r"""Create a fresh agent instance."""
8898
agent = self.base_agent.clone(with_memory=False)
99+
self._agent_metadata_pool[id(agent)] = {
100+
'task_count': 0,
101+
'error_count': 0,
102+
'total_tokens_used': 0,
103+
'average_tokens_per_task': 0,
104+
}
89105
self._total_clones_created += 1
90106
return agent
91107

92-
async def get_agent(self) -> ChatAgent:
108+
def _calculate_affinity_score(
109+
self,
110+
agent: ChatAgent,
111+
metric_weights: Optional[List[float]] = None,
112+
default_fresh_agent_score: float = 0.75,
113+
) -> float:
114+
r"""Calculate the affinity score of a task based on its metadata."""
115+
if metric_weights is None:
116+
metric_weights = [0.7, 0.3]
117+
metadata = self._agent_metadata_pool.get(id(agent), {})
118+
119+
success_rate = (
120+
1 - (metadata['error_count'] / metadata['task_count'])
121+
if metadata['task_count'] > 0
122+
else default_fresh_agent_score
123+
)
124+
125+
freshness = 1.0 - (metadata['task_count'] / self._max_tasks_per_agent)
126+
127+
return (metric_weights[0] * success_rate) + (
128+
metric_weights[1] * max(freshness, 0.0)
129+
)
130+
131+
async def get_agent(
132+
self,
133+
metric_weights: Optional[List[float]] = None,
134+
default_fresh_agent_score: float = 0.75,
135+
) -> ChatAgent:
93136
r"""Get an agent from the pool, creating one if necessary."""
94137
async with self._lock:
95138
self._total_borrows += 1
139+
best_agent: Optional[ChatAgent] = None
140+
metric_weights = metric_weights or [0.7, 0.3]
96141

97-
# Try to get from available agents first
98142
if self._available_agents:
99-
agent = self._available_agents.popleft()
100-
self._in_use_agents.add(id(agent))
143+
best_agent = max(
144+
self._available_agents,
145+
key=lambda agent: self._calculate_affinity_score(
146+
agent, metric_weights, default_fresh_agent_score
147+
),
148+
)
149+
self._available_agents.remove(best_agent)
101150
self._pool_hits += 1
102151

103-
# Reset the agent state
104-
agent.reset()
105-
return agent
152+
elif len(self._in_use_agents) < self.max_size or self.auto_scale:
153+
best_agent = self._create_fresh_agent()
106154

107-
# Check if we can create new agents
108-
total_agents = len(self._available_agents) + len(
109-
self._in_use_agents
110-
)
111-
if total_agents < self.max_size:
112-
agent = self._create_fresh_agent()
113-
self._in_use_agents.add(id(agent))
114-
return agent
115-
116-
# Pool exhausted, wait and retry or create temporary agent
117-
if self.auto_scale:
118-
# Create a temporary agent that won't be returned to pool
119-
return self._create_fresh_agent()
120155
else:
121-
# Wait for an agent to become available
156+
wait_start = time.time()
157+
self._tasks_waited += 1
122158
while not self._available_agents:
123159
await asyncio.sleep(0.1)
160+
self._total_wait_time += time.time() - wait_start
161+
162+
best_agent = max(
163+
self._available_agents,
164+
key=lambda agent: self._calculate_affinity_score(
165+
agent, metric_weights, default_fresh_agent_score
166+
),
167+
)
168+
self._available_agents.remove(best_agent)
169+
self._pool_hits += 1
124170

125-
agent = self._available_agents.popleft()
126-
self._in_use_agents.add(id(agent))
127-
agent.reset()
128-
return agent
171+
best_agent.reset()
172+
self._in_use_agents.add(id(best_agent))
173+
return best_agent
129174

130-
async def return_agent(self, agent: ChatAgent) -> None:
175+
async def return_agent(
176+
self, agent: ChatAgent, task_status: Optional[str] = None
177+
) -> None:
131178
r"""Return an agent to the pool."""
132179
async with self._lock:
133180
agent_id = id(agent)
181+
if agent_id not in self._in_use_agents:
182+
return
183+
184+
if agent_id in self._agent_metadata_pool:
185+
metadata = self._agent_metadata_pool.get(agent_id, {})
186+
metadata['task_count'] += 1
187+
if task_status == 'FAILED':
188+
metadata['error_count'] += 1
189+
190+
_, final_token_count = agent.memory.get_context()
191+
metadata['total_tokens_used'] += final_token_count
192+
metadata['average_tokens_per_task'] = (
193+
metadata['total_tokens_used'] / metadata['task_count']
194+
)
134195

135-
if agent_id in self._in_use_agents:
136-
self._in_use_agents.remove(agent_id)
196+
self._agent_last_used[agent_id] = time.time()
137197

138-
# Only return to pool if we're under max size
139-
if len(self._available_agents) < self.max_size:
140-
# Reset agent state before returning to pool
141-
agent.reset()
142-
self._available_agents.append(agent)
143-
self._agent_last_used[agent_id] = time.time()
198+
self._in_use_agents.remove(agent_id)
199+
if len(self._available_agents) < self.max_size:
200+
self._available_agents.append(agent)
144201

145202
async def cleanup_idle_agents(self) -> None:
146203
r"""Remove idle agents from the pool to free memory."""
@@ -155,23 +212,41 @@ async def cleanup_idle_agents(self) -> None:
155212
agent_id = id(agent)
156213
last_used = self._agent_last_used.get(agent_id, current_time)
157214

158-
if current_time - last_used > self.idle_timeout:
215+
agent_metadata = self._agent_metadata_pool.get(agent_id, {})
216+
agent_token_limit = (
217+
agent.memory.get_context_creator().token_limit
218+
)
219+
220+
if (
221+
current_time - last_used > self.idle_timeout
222+
or agent_metadata['task_count']
223+
>= self._max_tasks_per_agent
224+
or agent_metadata['total_tokens_used']
225+
>= agent_token_limit * 0.8
226+
):
159227
agents_to_remove.append(agent)
160228

229+
self._agents_cleaned += len(agents_to_remove)
230+
161231
for agent in agents_to_remove:
162232
self._available_agents.remove(agent)
163-
agent_id = id(agent)
164-
self._agent_last_used.pop(agent_id, None)
233+
self._agent_last_used.pop(id(agent), None)
234+
self._agent_metadata_pool.pop(id(agent), None)
165235

166236
def get_stats(self) -> dict:
167237
r"""Get pool statistics."""
168238
return {
169239
"available_agents": len(self._available_agents),
170240
"in_use_agents": len(self._in_use_agents),
241+
"pool_size": len(self._agent_metadata_pool),
171242
"total_borrows": self._total_borrows,
172243
"total_clones_created": self._total_clones_created,
173244
"pool_hits": self._pool_hits,
174245
"hit_rate": self._pool_hits / max(self._total_borrows, 1),
246+
"agents_cleaned_up": self._agents_cleaned,
247+
"tasks_had_to_wait": self._tasks_waited,
248+
"average_wait_time": self._total_wait_time
249+
/ max(self._tasks_waited, 1),
175250
}
176251

177252

@@ -258,10 +333,12 @@ async def _get_worker_agent(self) -> ChatAgent:
258333
# Fallback to original cloning approach
259334
return self.worker.clone(with_memory=False)
260335

261-
async def _return_worker_agent(self, agent: ChatAgent) -> None:
336+
async def _return_worker_agent(
337+
self, agent: ChatAgent, task_stat: str
338+
) -> None:
262339
r"""Return a worker agent to the pool if pooling is enabled."""
263340
if self.use_agent_pool and self.agent_pool:
264-
await self.agent_pool.return_agent(agent)
341+
await self.agent_pool.return_agent(agent, task_stat)
265342
# If not using pool, agent will be garbage collected
266343

267344
async def _process_task(
@@ -391,7 +468,7 @@ async def _process_task(
391468
return TaskState.FAILED
392469
finally:
393470
# Return agent to pool or let it be garbage collected
394-
await self._return_worker_agent(worker_agent)
471+
await self._return_worker_agent(worker_agent, task.state.value)
395472

396473
# Populate additional_info with worker attempt details
397474
if task.additional_info is None:
@@ -477,7 +554,24 @@ async def _periodic_cleanup(self):
477554
r"""Periodically clean up idle agents from the pool."""
478555
while True:
479556
try:
480-
await asyncio.sleep(60) # Cleanup every minute
557+
idle_ratio = (
558+
len(self.agent_pool.get_stats()["available_agents"])
559+
/ self.agent_pool.max_size
560+
if self.agent_pool.max_size > 0
561+
else 0.0
562+
)
563+
564+
sleep_duration = (
565+
self.agent_pool.min_cleanup_interval
566+
+ (
567+
self.agent_pool.max_cleanup_interval
568+
- self.agent_pool.min_cleanup_interval
569+
)
570+
* idle_ratio
571+
)
572+
573+
await asyncio.sleep(sleep_duration)
574+
481575
if self.agent_pool:
482576
await self.agent_pool.cleanup_idle_agents()
483577
except asyncio.CancelledError:

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,6 @@ module = [
647647
"langfuse.*",
648648
"outlines.*",
649649
"torch.*",
650-
"sentence_transformers.*",
651650
"cv2",
652651
"rlcard.*",
653652
"pytesseract",

0 commit comments

Comments
 (0)