Skip to content

Commit 9f91e8e

Browse files
authored
enhance: Improve agent pool efficiency and add performance metrics PR… (#3108)
1 parent 2f6a326 commit 9f91e8e

File tree

2 files changed

+51
-138
lines changed

2 files changed

+51
-138
lines changed

camel/societies/workforce/single_agent_worker.py

Lines changed: 49 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import datetime
1818
import time
1919
from collections import deque
20-
from typing import Any, Dict, List, Optional
20+
from typing import Any, List, Optional
2121

2222
from colorama import Fore
2323

@@ -35,8 +35,8 @@
3535
class AgentPool:
3636
r"""A pool of agent instances for efficient reuse.
3737
38-
This pool manages a collection of pre-cloned agents. It supports
39-
auto-scaling based ondemand and intelligent reuse of existing agents.
38+
This pool manages a collection of pre-cloned agents with automatic
39+
scaling and idle timeout cleanup.
4040
4141
Args:
4242
base_agent (ChatAgent): The base agent to clone from.
@@ -48,6 +48,8 @@ class AgentPool:
4848
(default: :obj:`True`)
4949
idle_timeout (float): Time in seconds after which idle agents are
5050
removed. (default: :obj:`180.0`)
51+
cleanup_interval (float): Fixed interval in seconds between cleanup
52+
checks. (default: :obj:`60.0`)
5153
"""
5254

5355
def __init__(
@@ -57,18 +59,13 @@ def __init__(
5759
max_size: int = 10,
5860
auto_scale: bool = True,
5961
idle_timeout: float = 180.0,
60-
max_tasks_per_agent: int = 10,
61-
min_cleanup_interval: float = 15.0,
62-
max_cleanup_interval: float = 120.0,
62+
cleanup_interval: float = 60.0,
6363
):
6464
self.base_agent = base_agent
6565
self.max_size = max_size
6666
self.auto_scale = auto_scale
6767
self.idle_timeout = idle_timeout
68-
self._max_tasks_per_agent = max_tasks_per_agent
69-
self.min_cleanup_interval = min_cleanup_interval
70-
self.max_cleanup_interval = max_cleanup_interval
71-
self._agent_metadata_pool: Dict[int, Dict[str, Any]] = {}
68+
self.cleanup_interval = cleanup_interval
7269

7370
# Pool management
7471
self._available_agents: deque = deque()
@@ -81,8 +78,6 @@ def __init__(
8178
self._total_clones_created = 0
8279
self._pool_hits = 0
8380
self._agents_cleaned = 0
84-
self._tasks_waited = 0
85-
self._total_wait_time = 0.0
8681

8782
# Initialize pool
8883
self._initialize_pool(initial_size)
@@ -92,161 +87,95 @@ def _initialize_pool(self, size: int) -> None:
9287
for _ in range(min(size, self.max_size)):
9388
agent = self._create_fresh_agent()
9489
self._available_agents.append(agent)
90+
self._agent_last_used[id(agent)] = time.time()
9591

9692
def _create_fresh_agent(self) -> ChatAgent:
9793
r"""Create a fresh agent instance."""
9894
agent = self.base_agent.clone(with_memory=False)
99-
self._agent_metadata_pool[id(agent)] = {
100-
'task_count': 0,
101-
'error_count': 0,
102-
'total_tokens_used': 0,
103-
'average_tokens_per_task': 0,
104-
}
10595
self._total_clones_created += 1
10696
return agent
10797

108-
def _calculate_affinity_score(
109-
self,
110-
agent: ChatAgent,
111-
metric_weights: Optional[List[float]] = None,
112-
default_fresh_agent_score: float = 0.75,
113-
) -> float:
114-
r"""Calculate the affinity score of a task based on its metadata."""
115-
if metric_weights is None:
116-
metric_weights = [0.7, 0.3]
117-
metadata = self._agent_metadata_pool.get(id(agent), {})
118-
119-
success_rate = (
120-
1 - (metadata['error_count'] / metadata['task_count'])
121-
if metadata['task_count'] > 0
122-
else default_fresh_agent_score
123-
)
124-
125-
freshness = 1.0 - (metadata['task_count'] / self._max_tasks_per_agent)
126-
127-
return (metric_weights[0] * success_rate) + (
128-
metric_weights[1] * max(freshness, 0.0)
129-
)
130-
131-
async def get_agent(
132-
self,
133-
metric_weights: Optional[List[float]] = None,
134-
default_fresh_agent_score: float = 0.75,
135-
) -> ChatAgent:
98+
async def get_agent(self) -> ChatAgent:
13699
r"""Get an agent from the pool, creating one if necessary."""
137100
async with self._lock:
138101
self._total_borrows += 1
139-
best_agent: Optional[ChatAgent] = None
140-
metric_weights = metric_weights or [0.7, 0.3]
141102

142103
if self._available_agents:
143-
best_agent = max(
144-
self._available_agents,
145-
key=lambda agent: self._calculate_affinity_score(
146-
agent, metric_weights, default_fresh_agent_score
147-
),
148-
)
149-
self._available_agents.remove(best_agent)
104+
agent = self._available_agents.popleft()
105+
self._in_use_agents.add(id(agent))
150106
self._pool_hits += 1
107+
return agent
151108

152-
elif len(self._in_use_agents) < self.max_size or self.auto_scale:
153-
best_agent = self._create_fresh_agent()
109+
# Check if we can create a new agent
110+
if len(self._in_use_agents) < self.max_size or self.auto_scale:
111+
agent = self._create_fresh_agent()
112+
self._in_use_agents.add(id(agent))
113+
return agent
154114

155-
else:
156-
wait_start = time.time()
157-
self._tasks_waited += 1
158-
while not self._available_agents:
159-
await asyncio.sleep(0.1)
160-
self._total_wait_time += time.time() - wait_start
161-
162-
best_agent = max(
163-
self._available_agents,
164-
key=lambda agent: self._calculate_affinity_score(
165-
agent, metric_weights, default_fresh_agent_score
166-
),
167-
)
168-
self._available_agents.remove(best_agent)
169-
self._pool_hits += 1
170-
171-
best_agent.reset()
172-
self._in_use_agents.add(id(best_agent))
173-
return best_agent
174-
175-
async def return_agent(
176-
self, agent: ChatAgent, task_status: Optional[str] = None
177-
) -> None:
115+
# Wait for available agent
116+
while True:
117+
async with self._lock:
118+
if self._available_agents:
119+
agent = self._available_agents.popleft()
120+
self._in_use_agents.add(id(agent))
121+
self._pool_hits += 1
122+
return agent
123+
await asyncio.sleep(0.05)
124+
125+
async def return_agent(self, agent: ChatAgent) -> None:
178126
r"""Return an agent to the pool."""
127+
agent_id = id(agent)
128+
179129
async with self._lock:
180-
agent_id = id(agent)
181130
if agent_id not in self._in_use_agents:
182131
return
183132

184-
if agent_id in self._agent_metadata_pool:
185-
metadata = self._agent_metadata_pool.get(agent_id, {})
186-
metadata['task_count'] += 1
187-
if task_status == 'FAILED':
188-
metadata['error_count'] += 1
133+
self._in_use_agents.discard(agent_id)
189134

190-
_, final_token_count = agent.memory.get_context()
191-
metadata['total_tokens_used'] += final_token_count
192-
metadata['average_tokens_per_task'] = (
193-
metadata['total_tokens_used'] / metadata['task_count']
194-
)
195-
196-
self._agent_last_used[agent_id] = time.time()
197-
198-
self._in_use_agents.remove(agent_id)
135+
# Only add back to pool if under max size
199136
if len(self._available_agents) < self.max_size:
137+
agent.reset()
138+
self._agent_last_used[agent_id] = time.time()
200139
self._available_agents.append(agent)
140+
else:
141+
# Remove tracking for agents not returned to pool
142+
self._agent_last_used.pop(agent_id, None)
201143

202144
async def cleanup_idle_agents(self) -> None:
203145
r"""Remove idle agents from the pool to free memory."""
204146
if not self.auto_scale:
205147
return
206148

207149
async with self._lock:
150+
if not self._available_agents:
151+
return
152+
208153
current_time = time.time()
209154
agents_to_remove = []
210155

211156
for agent in list(self._available_agents):
212157
agent_id = id(agent)
213158
last_used = self._agent_last_used.get(agent_id, current_time)
214-
215-
agent_metadata = self._agent_metadata_pool.get(agent_id, {})
216-
agent_token_limit = (
217-
agent.memory.get_context_creator().token_limit
218-
)
219-
220-
if (
221-
current_time - last_used > self.idle_timeout
222-
or agent_metadata['task_count']
223-
>= self._max_tasks_per_agent
224-
or agent_metadata['total_tokens_used']
225-
>= agent_token_limit * 0.8
226-
):
159+
if current_time - last_used > self.idle_timeout:
227160
agents_to_remove.append(agent)
228161

229-
self._agents_cleaned += len(agents_to_remove)
230-
231162
for agent in agents_to_remove:
232163
self._available_agents.remove(agent)
233164
self._agent_last_used.pop(id(agent), None)
234-
self._agent_metadata_pool.pop(id(agent), None)
165+
self._agents_cleaned += 1
235166

236167
def get_stats(self) -> dict:
237168
r"""Get pool statistics."""
238169
return {
239170
"available_agents": len(self._available_agents),
240171
"in_use_agents": len(self._in_use_agents),
241-
"pool_size": len(self._agent_metadata_pool),
172+
"pool_size": len(self._available_agents)
173+
+ len(self._in_use_agents),
242174
"total_borrows": self._total_borrows,
243175
"total_clones_created": self._total_clones_created,
244176
"pool_hits": self._pool_hits,
245177
"hit_rate": self._pool_hits / max(self._total_borrows, 1),
246178
"agents_cleaned_up": self._agents_cleaned,
247-
"tasks_had_to_wait": self._tasks_waited,
248-
"average_wait_time": self._total_wait_time
249-
/ max(self._tasks_waited, 1),
250179
}
251180

252181

@@ -333,12 +262,10 @@ async def _get_worker_agent(self) -> ChatAgent:
333262
# Fallback to original cloning approach
334263
return self.worker.clone(with_memory=False)
335264

336-
async def _return_worker_agent(
337-
self, agent: ChatAgent, task_stat: str
338-
) -> None:
265+
async def _return_worker_agent(self, agent: ChatAgent) -> None:
339266
r"""Return a worker agent to the pool if pooling is enabled."""
340267
if self.use_agent_pool and self.agent_pool:
341-
await self.agent_pool.return_agent(agent, task_stat)
268+
await self.agent_pool.return_agent(agent)
342269
# If not using pool, agent will be garbage collected
343270

344271
async def _process_task(
@@ -468,7 +395,7 @@ async def _process_task(
468395
return TaskState.FAILED
469396
finally:
470397
# Return agent to pool or let it be garbage collected
471-
await self._return_worker_agent(worker_agent, task.state.value)
398+
await self._return_worker_agent(worker_agent)
472399

473400
# Populate additional_info with worker attempt details
474401
if task.additional_info is None:
@@ -554,23 +481,8 @@ async def _periodic_cleanup(self):
554481
r"""Periodically clean up idle agents from the pool."""
555482
while True:
556483
try:
557-
idle_ratio = (
558-
len(self.agent_pool.get_stats()["available_agents"])
559-
/ self.agent_pool.max_size
560-
if self.agent_pool.max_size > 0
561-
else 0.0
562-
)
563-
564-
sleep_duration = (
565-
self.agent_pool.min_cleanup_interval
566-
+ (
567-
self.agent_pool.max_cleanup_interval
568-
- self.agent_pool.min_cleanup_interval
569-
)
570-
* idle_ratio
571-
)
572-
573-
await asyncio.sleep(sleep_duration)
484+
# Fixed interval cleanup
485+
await asyncio.sleep(self.agent_pool.cleanup_interval)
574486

575487
if self.agent_pool:
576488
await self.agent_pool.cleanup_idle_agents()

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,8 @@ module = [
655655
"pgvector.*",
656656
"reportlab.*",
657657
"surrealdb.*",
658-
"microsandbox.*"
658+
"microsandbox.*",
659+
"sentence_transformers.*"
659660
]
660661
ignore_missing_imports = true
661662

0 commit comments

Comments
 (0)