From 8bbddfe6549ee3d500d794176b3141fb4e764ad2 Mon Sep 17 00:00:00 2001 From: Saedbhati Date: Fri, 26 Sep 2025 16:57:25 +0530 Subject: [PATCH 1/2] feat: Zoho Mail --- camel/societies/workforce/workforce.py | 36 ++ camel/toolkits/zoho_toolkit.py | 815 +++++++++++++++++++++++++ examples/toolkits/zoho_toolkit.py | 224 +++++++ 3 files changed, 1075 insertions(+) create mode 100644 camel/toolkits/zoho_toolkit.py create mode 100644 examples/toolkits/zoho_toolkit.py diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index 0d612e8497..e3b730c1af 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -197,6 +197,12 @@ class Workforce(BaseNode): support native structured output. When disabled, the workforce uses the native response_format parameter. (default: :obj:`True`) + on_subtask_completed (Optional[Callable[[Task], None]], optional): + Callback function to be called when a subtask is completed. + (default: :obj:`None`) + on_subtask_failed (Optional[Callable[[Task], None]], optional): + Callback function to be called when a subtask fails. + (default: :obj:`None`) Example: >>> import asyncio @@ -249,6 +255,8 @@ def __init__( share_memory: bool = False, use_structured_output_handler: bool = True, task_timeout_seconds: Optional[float] = None, + on_subtask_completed: Optional[Callable[[Task], None]] = None, + on_subtask_failed: Optional[Callable[[Task], None]] = None, ) -> None: super().__init__(description) self._child_listening_tasks: Deque[ @@ -265,6 +273,9 @@ def __init__( if self.use_structured_output_handler: self.structured_handler = StructuredOutputHandler() self.metrics_logger = WorkforceLogger(workforce_id=self.node_id) + # Optional user callbacks for subtask lifecycle notifications + self.on_subtask_completed = on_subtask_completed + self.on_subtask_failed = on_subtask_failed self._task: Optional[Task] = None self._pending_tasks: Deque[Task] = deque() self._task_dependencies: Dict[str, List[str]] = {} @@ -2517,6 +2528,14 @@ async def _handle_failed_task(self, task: Task) -> bool: self._completed_tasks.append(task) if task.id in self._assignees: await self._channel.archive_task(task.id) + # Invoke failure callback before halting + if self.on_subtask_failed is not None: + try: + self.on_subtask_failed(task) + except Exception as cb_err: + logger.warning( + f"on_subtask_failed callback raised: {cb_err}" + ) return True # If too many tasks are failing rapidly, also halt to prevent infinite @@ -2654,6 +2673,15 @@ async def _handle_failed_task(self, task: Task) -> bool: self._completed_tasks.append(task) return False + # Notify failure after bookkeeping, before scheduling next work + if self.on_subtask_failed is not None: + try: + self.on_subtask_failed(task) + except Exception as cb_err: + logger.warning( + f"on_subtask_failed callback raised: {cb_err}" + ) + logger.debug( f"Task {task.id} failed and was {action_taken}. " f"Updating dependency state." @@ -2757,6 +2785,14 @@ async def _handle_completed_task(self, task: Task) -> None: if task.id in self._assignees: await self._channel.archive_task(task.id) + if self.on_subtask_completed is not None: + try: + self.on_subtask_completed(task) + except Exception as cb_err: + logger.warning( + f"on_subtask_completed callback raised: {cb_err}" + ) + # Ensure it's in completed tasks set by updating if it exists or # appending if it's new. task_found_in_completed = False diff --git a/camel/toolkits/zoho_toolkit.py b/camel/toolkits/zoho_toolkit.py new file mode 100644 index 0000000000..22a88c249c --- /dev/null +++ b/camel/toolkits/zoho_toolkit.py @@ -0,0 +1,815 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union +from urllib.parse import quote_plus, urlencode + +import requests + +from camel.toolkits.base import BaseToolkit +from camel.toolkits.function_tool import FunctionTool + + +class ZohoMailToolkit(BaseToolkit): + r"""Zoho Mail Toolkit for CAMEL + + Args: + access_token (str): OAuth access token used for API requests. + account_id (str): Zoho Mail numeric account identifier. + datacenter (str, optional): Datacenter key in {"com", "eu", "in", + "au", "cn"}. (default: :obj:`"com"`) + """ + + # Map datacenter keys to accounts domain + DC_MAP: ClassVar[Dict[str, str]] = { + "com": "https://accounts.zoho.com", + "eu": "https://accounts.zoho.eu", + "in": "https://accounts.zoho.in", + "au": "https://accounts.zoho.com.au", + "cn": "https://accounts.zoho.com.cn", + } + + # Map datacenter keys to mail API domain + MAIL_DC_MAP: ClassVar[Dict[str, str]] = { + "com": "https://mail.zoho.com", + "eu": "https://mail.zoho.eu", + "in": "https://mail.zoho.in", + "au": "https://mail.zoho.com.au", + "cn": "https://mail.zoho.com.cn", + } + + def __init__( + self, access_token: str, account_id: str, datacenter: str = "com" + ): + r""" + Initialize toolkit with OAuth token and account context. + + Args: + access_token (str): OAuth access token. + + account_id (str): Zoho Mail numeric account identifier. + datacenter (str, optional): Datacenter key (default: :obj:`"com"`). + """ + self.access_token = access_token + self.account_id = account_id + self.datacenter = datacenter + self.base_url = self.MAIL_DC_MAP.get( + datacenter, self.MAIL_DC_MAP["com"] + ) + self.accounts_base = self.DC_MAP.get( + datacenter, self.DC_MAP["com"] + ) # accounts domain + + def _request_json( + self, + method: str, + url: str, + *, + params: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + r"""HTTP JSON request with Zoho OAuth header. + + Args: + method (str): HTTP method. + url (str): URL to request. + params (dict, optional): Query parameters. + json (dict, optional): JSON body. + """ + headers = { + "Authorization": f"Zoho-oauthtoken {self.access_token}", + "Content-Type": "application/json", + } + try: + response = requests.request( + method=method, + url=url, + headers=headers, + params=params, + json=json, + ) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + return { + "error": str(e), + "status_code": getattr(e.response, "status_code", None), + "response": getattr(e.response, "text", None), + "request_url": url, + "request_params": params, + "request_json": json, + } + + def _resolve_folder_id(self, folder_id_or_name: str) -> Optional[str]: + r"""Resolve human name like 'inbox' to folderId by listing folders. + + Args: + folder_id_or_name (str): Folder name or ID. + + Returns: + Optional[str]: Numeric folderId as string, or None if not found. + """ + if folder_id_or_name and str(folder_id_or_name).isdigit(): + return str(folder_id_or_name) + try: + url = f"{self.base_url}/api/accounts/{self.account_id}/folders" + resp = self._request_json("GET", url) + items = ( + resp + if isinstance(resp, list) + else resp.get("data") or resp.get("folders") or [] + ) + target = str(folder_id_or_name).strip().lower() + for f in items: + if not isinstance(f, dict): + continue + fid = f.get("folderId") or f.get("id") or f.get("folder_id") + names = [] + for key in ( + "folderName", + "displayName", + "name", + "systemFolder", + "folderType", + "type", + ): + v = f.get(key) + if isinstance(v, str): + names.append(v.strip().lower()) + if target in names and fid is not None: + return str(fid) + # Secondary: prefix match + for f in items: + if not isinstance(f, dict): + continue + fid = f.get("folderId") or f.get("id") + for key in ("folderName", "displayName", "name"): + v = f.get(key) + if ( + isinstance(v, str) + and v.strip().lower().startswith(target) + and fid is not None + ): + return str(fid) + except Exception: + return None + return None + + # --------------- + # Accounts helper + # --------------- + @staticmethod + def resolve_account_id_static( + dc: str, token: str, identifier: str + ) -> Tuple[str, str]: + r"""Resolve a Zoho Mail numeric account id and a default from email. + + Args: + dc (str): Datacenter key in {"com", "eu", "in", "au", "cn"}. + token (str): OAuth access token. + identifier (str): Numeric id or email address; empty to auto-pick. + + Returns: + Tuple[str, str]: (account_id, default_from_email). Empty strings + if not resolved. + """ + base_map = { + "com": "https://mail.zoho.com", + "eu": "https://mail.zoho.eu", + "in": "https://mail.zoho.in", + "au": "https://mail.zoho.com.au", + "cn": "https://mail.zoho.com.cn", + } + mail_base = base_map.get(dc, base_map["com"]) # default com + try: + resp = requests.get( + f"{mail_base}/api/accounts", + headers={"Authorization": f"Zoho-oauthtoken {token}"}, + timeout=15, + ) + resp.raise_for_status() + data = resp.json() + items = ( + data + if isinstance(data, list) + else data.get("data") or data.get("accounts") or [] + ) + if not items: + return "", "" + # Numeric id check + if identifier and identifier.isdigit(): + for acc in items: + acc_id = str( + acc.get("accountId") + or acc.get("id") + or acc.get("account_id") + or "" + ) + if acc_id == identifier: + emails: List[str] = [] + for key in ( + "emailAddress", + "primaryEmailAddress", + "email", + ): + v = acc.get(key) + if isinstance(v, str): + emails.append(v) + elif isinstance(v, list): + emails.extend( + [e for e in v if isinstance(e, str)] + ) + return identifier, (emails[0] if emails else "") + # Email match + if identifier and "@" in identifier: + target = identifier.lower() + for acc in items: + candidate_emails: List[str] = [] + for key in ( + "emailAddress", + "primaryEmailAddress", + "email", + ): + v = acc.get(key) + if isinstance(v, str): + candidate_emails.append(v.lower()) + elif isinstance(v, list): + for elem in v: + if isinstance(elem, str): + candidate_emails.append(elem.lower()) + elif isinstance(elem, dict): + for k in ("email", "address", "value"): + val = elem.get(k) + if isinstance(val, str): + candidate_emails.append( + val.lower() + ) + if target in candidate_emails: + acc_id = str( + acc.get("accountId") + or acc.get("id") + or acc.get("account_id") + or "" + ) + return acc_id, ( + candidate_emails[0] if candidate_emails else "" + ) + # Fallback: first account + acc = items[0] + acc_id = str( + acc.get("accountId") + or acc.get("id") + or acc.get("account_id") + or "" + ) + first_emails: List[str] = [] + for key in ("emailAddress", "primaryEmailAddress", "email"): + v = acc.get(key) + if isinstance(v, str): + first_emails.append(v) + elif isinstance(v, list): + first_emails.extend([e for e in v if isinstance(e, str)]) + return acc_id, (first_emails[0] if first_emails else "") + except requests.RequestException: + return "", "" + + # --------------- + # OAuth helpers + # --------------- + @staticmethod + def build_authorize_url( + *, + client_type: str, + client_id: str, + scope: str, + redirect_uri: Optional[str] = None, + access_type: str = "offline", + prompt: Optional[str] = "consent", + state: Optional[str] = None, + dc: str = "com", + extra_params: Optional[Dict[str, Any]] = None, + ) -> str: + r"""Build OAuth authorization URL for Zoho. + + Args: + client_type (str): One of "server", "client", "mobile", + "non-browser". + client_id (str): OAuth client id. + scope (str): Comma-separated scopes string. + redirect_uri (str, optional): Redirect URI registered in Zoho. + access_type (str, optional): "offline" or "online". + prompt (str, optional): Consent prompt behavior. + state (str, optional): Opaque state value. + dc (str, optional): Datacenter key (default: :obj:`"com"`). + extra_params (dict, optional): Extra query params to include. + + Returns: + str: Fully qualified authorize URL. + """ + if dc not in ZohoMailToolkit.DC_MAP: + supported = list(ZohoMailToolkit.DC_MAP.keys()) + raise ValueError( + f"Unknown datacenter '{dc}'. Supported: {supported}" + ) + base = ZohoMailToolkit.DC_MAP[dc] + + response_type_map = { + "server": "code", + "client": "token", + "mobile": "code", + "non-browser": "code", + } + client_type_lower = client_type.lower() + if client_type_lower not in response_type_map: + raise ValueError( + "Unsupported client_type. Use server|client|mobile|non-browser" + ) + response_type = response_type_map[client_type_lower] + if response_type in ("code", "token") and not redirect_uri: + raise ValueError("redirect_uri is required for this client type") + + params: Dict[str, Any] = { + "scope": scope, + "client_id": client_id, + "response_type": response_type, + "redirect_uri": redirect_uri, + "access_type": access_type, + } + if prompt: + params["prompt"] = prompt + if state: + params["state"] = state + if extra_params: + params.update(extra_params) + qs = urlencode( + {k: v for k, v in params.items() if v is not None}, + doseq=True, + quote_via=quote_plus, + ) + return f"{base}/oauth/v2/auth?{qs}" + + def exchange_code_for_token( + self, + *, + code: str, + client_id: str, + client_secret: str, + redirect_uri: str, + scope: Optional[str] = None, + ) -> Dict[str, Any]: + r"""Exchange authorization code for access (and refresh) token. + + Args: + code (str): Authorization code returned to the redirect URI. + client_id (str): OAuth client id. + client_secret (str): OAuth client secret. + redirect_uri (str): Redirect URI used during authorization. + scope (str, optional): Optional scopes to reiterate. + + Returns: + Dict[str, Any]: Token response payload. + """ + url = f"{self.accounts_base}/oauth/v2/token" + params: Dict[str, Any] = { + "code": code, + "grant_type": "authorization_code", + "client_id": client_id, + "client_secret": client_secret, + "redirect_uri": redirect_uri, + } + if scope: + params["scope"] = scope + return self._request_json("POST", url, params=params) + + def refresh_access_token( + self, + *, + refresh_token: str, + client_id: str, + client_secret: str, + scope: Optional[str] = None, + ) -> Dict[str, Any]: + r"""Refresh an access token via a refresh token. + + Args: + refresh_token (str): Valid refresh token. + client_id (str): OAuth client id. + client_secret (str): OAuth client secret. + scope (str, optional): Optional scopes to reiterate. + + Returns: + Dict[str, Any]: Token response payload. + """ + url = f"{self.accounts_base}/oauth/v2/token" + params: Dict[str, Any] = { + "refresh_token": refresh_token, + "grant_type": "refresh_token", + "client_id": client_id, + "client_secret": client_secret, + } + if scope: + params["scope"] = scope + return self._request_json("POST", url, params=params) + + def revoke_token(self, token: str) -> Dict[str, Any]: + r"""Revoke a refresh or access token. + + Args: + token (str): Refresh or access token to revoke. + + Returns: + Dict[str, Any]: Revocation status payload. + """ + url = f"{self.accounts_base}/oauth/v2/token/revoke" + params = {"token": token} + resp = self._request_json("POST", url, params=params) + if "error" not in resp: + return {"status_code": 200, "ok": True} + return resp + + def _join_recipients( + self, recipients: Optional[Union[List[str], List[Dict[str, Any]]]] + ) -> Optional[str]: + r"""Convert list of recipients to comma-separated string per Zoho API. + + Args: + recipients (Optional[Union[List[str], List[Dict[str, Any]]]]): + List of recipients. + + Returns: + Optional[str]: Comma-separated string of recipients. + """ + if not recipients: + return None + emails: List[str] = [] + for recipient in recipients: + if isinstance(recipient, str): + emails.append(recipient) + elif isinstance(recipient, dict) and recipient.get("email"): + emails.append(str(recipient["email"])) + return ",".join(emails) if emails else None + + # --------------- + # Actions + # --------------- + + def _add_recipients( + self, + payload: Dict[str, Any], + *, + to: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + cc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + bcc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + ) -> None: + r"""Helper to append recipient fields to payload if present. + + Args: + payload (Dict[str, Any]): Payload to append recipients to. + to (Optional[Union[List[str], List[Dict[str, Any]]]]): + To recipients. + cc (Optional[Union[List[str], List[Dict[str, Any]]]]): + CC recipients. + bcc (Optional[Union[List[str], List[Dict[str, Any]]]]): + BCC recipients. + """ + to_str = self._join_recipients(to) + if to_str: + payload["toAddress"] = to_str + cc_str = self._join_recipients(cc) + if cc_str: + payload["ccAddress"] = cc_str + bcc_str = self._join_recipients(bcc) + if bcc_str: + payload["bccAddress"] = bcc_str + + def send_email( + self, + to: Union[List[str], List[Dict[str, Any]]], + subject: str, + content: str, + from_email: Optional[str] = None, + cc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + bcc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + is_html: bool = True, + encoding: str = "UTF-8", + ) -> Dict[str, Any]: + r""" + Action: Send Email (OAuth Scope: ZohoMail.messages) + + Args: + to (Union[List[str], List[Dict[str, Any]]]): + List of recipients (email strings or dicts with 'email'). + subject: Email subject + content: Email content/body + from_email: Sender email (optional, uses default if not provided) + cc: CC recipients (optional) + bcc: BCC recipients (optional) + is_html: Whether content is HTML (default: True) + encoding: Email encoding (default: UTF-8) + + Returns: + API response as dictionary + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/messages" + + # Convert recipients to comma-separated string as required by + # Zoho Mail API + + payload = { + "fromAddress": from_email + or f"noreply@{self.account_id.split('.')[0]}.zohomail.com", + "subject": subject, + "content": content, + "mailFormat": "html" if is_html else "plaintext", + "encoding": encoding, + } + + self._add_recipients(payload, to=to, cc=cc, bcc=bcc) + return self._request_json("POST", url, json=payload) + + def send_template_email( + self, + to: Union[List[str], List[Dict[str, Any]]], + template_id: str, + template_data: Optional[Dict[str, Any]] = None, + from_email: Optional[str] = None, + cc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + bcc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + subject: Optional[str] = None, + ) -> Dict[str, Any]: + r""" + Action: Send Template Mail (OAuth Scope: ZohoMail.messages) + + Args: + to (Union[List[str], List[Dict[str, Any]]]): + List of recipients (EmailRecipient objects or email strings). + template_id: Template ID to use + template_data: Data to populate template variables (optional) + from_email: Sender email (optional) + cc: CC recipients (optional) + bcc: BCC recipients (optional) + subject: Email subject (optional, uses template default if + not provided) + + Returns: + API response as dictionary + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/messages" + + # Convert recipients to comma-separated string as required by + # Zoho Mail API + + payload = { + "fromAddress": from_email + or f"noreply@{self.account_id.split('.')[0]}.zohomail.com", + "templateId": template_id, + "mailFormat": "html", + } + + self._add_recipients(payload, to=to, cc=cc, bcc=bcc) + if subject: + payload["subject"] = subject + if template_data: + payload["templateData"] = template_data + return self._request_json("POST", url, json=payload) + + def create_draft( + self, + to: Union[List[str], List[Dict[str, Any]]], + subject: str, + content: str, + from_email: Optional[str] = None, + cc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + bcc: Optional[Union[List[str], List[Dict[str, Any]]]] = None, + is_html: bool = True, + encoding: str = "UTF-8", + mode: str = "draft", + ) -> Dict[str, Any]: + r""" + Action: Create Draft (OAuth Scope: ZohoMail.messages) + + Args: + to: List of recipients (EmailRecipient objects or email strings) + subject: Email subject + content: Email content/body + from_email: Sender email (optional) + cc: CC recipients (optional) + bcc: BCC recipients (optional) + is_html: Whether content is HTML (default: True) + encoding: Email encoding (default: UTF-8) + + Returns: + API response as dictionary + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/messages" + + # Convert recipients to comma-separated string as required by + # Zoho Mail API + + payload: Dict[str, Any] = { + "mode": "template" if str(mode).lower() == "template" else "draft", + "subject": subject, + "content": content, + "mailFormat": "html" if is_html else "plaintext", + } + # Only include fromAddress if explicitly provided (some orgs + # restrict this field) + if from_email: + payload["fromAddress"] = from_email + # Encoding is optional; include only if explicitly set + if encoding: + payload["encoding"] = encoding + self._add_recipients(payload, to=to, cc=cc, bcc=bcc) + return self._request_json("POST", url, json=payload) + + def create_folder( + self, name: str, parent_id: Optional[Union[str, int]] = None + ) -> Dict[str, Any]: + r""" + Action: Create Folder (OAuth Scope: ZohoMail.messages) + + Args: + name (str): Folder name. + parent_id (Optional[Union[str, int]]): Parent folder ID (optional). + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/folders" + payload: Dict[str, Any] = {"folderName": name} + if parent_id is not None: + payload["parentId"] = str(parent_id) + return self._request_json("POST", url, json=payload) + + def create_tag( + self, name: str, color: Optional[str] = None + ) -> Dict[str, Any]: + r"""Action: Create Tag (OAuth Scope: ZohoMail.labels) + + Creates a tag/label for emails. + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/labels" + # Per API, expected keys are displayName (required) and optional + # color (hex) + payload: Dict[str, Any] = {"displayName": name} + if color: + payload["color"] = color + return self._request_json("POST", url, json=payload) + + def create_task( + self, + title: str, + description: Optional[str] = None, + due_in_epoch_ms: Optional[int] = None, + ) -> Dict[str, Any]: + r"""Action: Create Task (OAuth Scope: ZohoMail.tasks ) + + Creates a task in Zoho Mail Tasks. + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/tasks" + payload: Dict[str, Any] = {"title": title} + if description: + payload["description"] = description + if due_in_epoch_ms is not None: + payload["dueDateInMillis"] = int(due_in_epoch_ms) + return self._request_json("POST", url, json=payload) + + # --------------- + # Triggers (Polling) + # --------------- + def trigger_new_email_poll( + self, + folder_id: str, + since_epoch_seconds: Optional[int] = None, + limit: int = 20, + sort_order: str = "desc", + ) -> Dict[str, Any]: + r"""Trigger: New Email (Polling) + + Retrieves recent emails from a folder. Optionally filter + client-side using since_epoch_seconds. + + Args: + folder_id (str): Folder ID. + since_epoch_seconds (Optional[int]): Since epoch seconds. + limit (int): Limit. + sort_order (str): Sort order. + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/messages/view" + resolved_folder_id = self._resolve_folder_id(folder_id) + # Zoho expects sortorder boolean: true (asc), false (desc). + # Default start is 1. + params: Dict[str, Any] = { + "folderId": resolved_folder_id or folder_id, + "start": 1, + "limit": limit, + "sortorder": True + if str(sort_order).lower() in ("asc", "true", "1") + else False, + "sortBy": "date", + } + resp = self._request_json("GET", url, params=params) + if ( + since_epoch_seconds + and isinstance(resp, dict) + and isinstance(resp.get("data"), list) + ): + threshold_ms = int(since_epoch_seconds) * 1000 + filtered = [ + m + for m in resp["data"] + if isinstance(m, dict) + and int(m.get("receivedTimeInMillis", 0)) >= threshold_ms + ] + resp = {**resp, "data": filtered} + return resp + + def trigger_new_email_matching_search_poll( + self, + query: str, + folder_id: Optional[str] = None, + start: int = 1, + limit: int = 20, + include_to: bool = False, + received_time_ms: Optional[int] = None, + ) -> Dict[str, Any]: + r"""Trigger: New Email Matching Search (Polling) + + Retrieves emails based on a search query. + + Args: + query (str): Search query. + folder_id (Optional[str]): Folder ID. + start (int): Start. + limit (int): Limit. + include_to (bool): Include to. + received_time_ms (Optional[int]): Received time milliseconds. + """ + url = f"{self.base_url}/api/accounts/{self.account_id}/messages/search" + # Zoho expects 'searchKey' instead of 'query'. + params: Dict[str, Any] = { + "searchKey": query, + "start": start, + "limit": limit, + } + if include_to: + params["includeto"] = True + if received_time_ms is not None: + params["receivedTime"] = int(received_time_ms) + # Note: folderId is not supported in search API; omit to avoid + # EXTRA_PARAM_FOUND + return self._request_json("GET", url, params=params) + + def trigger_new_tagged_email_poll( + self, + tag_id: Union[str, int], + folder_id: Optional[str] = None, + start: int = 0, + limit: int = 20, + ) -> Dict[str, Any]: + r"""Trigger: New Tagged Email (Polling) + + Retrieves emails that have the specified tag/label. + + Args: + tag_id (Union[str, int]): Tag ID. + folder_id (Optional[str]): Folder ID. + start (int): Start. + limit (int): Limit. + + Returns: + API response as dictionary + """ + # Use search with label filter if available; fallback to folder + # filter plus client-side tag check if needed + url = f"{self.base_url}/api/accounts/{self.account_id}/messages/search" + query = f"label:{tag_id}" + params: Dict[str, Any] = { + "query": query, + "start": start, + "limit": limit, + } + if folder_id: + resolved_folder_id = self._resolve_folder_id(folder_id) + params["folderId"] = resolved_folder_id or folder_id + return self._request_json("GET", url, params=params) + + def get_tools(self) -> List[FunctionTool]: + return [ + FunctionTool(self.send_email), + FunctionTool(self.send_template_email), + FunctionTool(self.create_draft), + FunctionTool(self.create_folder), + FunctionTool(self.create_tag), + FunctionTool(self.create_task), + FunctionTool(self.trigger_new_email_poll), + FunctionTool(self.trigger_new_email_matching_search_poll), + FunctionTool(self.trigger_new_tagged_email_poll), + ] diff --git a/examples/toolkits/zoho_toolkit.py b/examples/toolkits/zoho_toolkit.py new file mode 100644 index 0000000000..12ac735173 --- /dev/null +++ b/examples/toolkits/zoho_toolkit.py @@ -0,0 +1,224 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import json +import os +import sys +import threading +import time +import webbrowser +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import parse_qs, urlparse + +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) +from camel.toolkits.zoho_toolkit import ZohoMailToolkit + + +def main(): + datacenter = os.getenv("ZOHO_DATACENTER", "com") + client_id = ( + os.getenv("ZOHO_CLIENT_ID") or input("Enter ZOHO_CLIENT_ID: ").strip() + ) + client_secret = ( + os.getenv("ZOHO_CLIENT_SECRET") + or input("Enter ZOHO_CLIENT_SECRET: ").strip() + ) + redirect_uri = os.getenv("ZOHO_REDIRECT_URI", "").strip() + scopes = os.getenv( + "ZOHO_SCOPES", + "ZohoMail.messages.ALL,ZohoMail.accounts.ALL,ZohoMail.folders.ALL,ZohoMail.tags.ALL", + ) + + # If no redirect URI provided, start a local redirect server and use it + holder = {"code": None, "location": None, "accounts_server": None} + server_thread = None + if not redirect_uri: + redirect_host = "127.0.0.1" + redirect_port = 8000 + redirect_path = "/callback" + redirect_uri = f"http://{redirect_host}:{redirect_port}{redirect_path}" + + class Handler(BaseHTTPRequestHandler): + def do_GET(self): + parsed = urlparse(self.path) + if parsed.path == redirect_path: + params = parse_qs(parsed.query) + holder["code"] = params.get("code", [None])[0] + holder["location"] = params.get("location", [None])[0] + holder["accounts_server"] = params.get( + "accounts-server", [None] + )[0] + self.send_response(200) + self.send_header( + "Content-Type", "text/html; charset=utf-8" + ) + self.end_headers() + self.wfile.write( + b"Authorization received. You can close this tab." # noqa: E501 + ) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, fmt, *args): + return + + def run_server(): + httpd = HTTPServer((redirect_host, redirect_port), Handler) + try: + httpd.handle_request() + except Exception: + pass + + server_thread = threading.Thread(target=run_server, daemon=True) + server_thread.start() + + auth_url = ZohoMailToolkit.build_authorize_url( + client_type="server", + client_id=client_id, + scope=scopes, + redirect_uri=redirect_uri, + access_type="offline", + dc=datacenter, + prompt="consent", + ) + print( + "\nOpen this URL in your browser and authorize, then paste the 'code':" + ) + print(auth_url) + try: + webbrowser.open(auth_url) + except Exception: + pass + code = None + if server_thread is not None: + # wait up to 180 seconds for redirect + for _ in range(180): + if holder["code"]: + code = holder["code"] + break + time.sleep(1) + + temp = ZohoMailToolkit(access_token="", account_id="", datacenter="in") + token_resp = temp.exchange_code_for_token( + code=code, + client_id=client_id, + client_secret=client_secret, + redirect_uri=redirect_uri, + scope=scopes, + ) + print("\nToken response:") + print(json.dumps(token_resp, indent=2)) + access_token = token_resp.get("access_token") or "" + if not access_token: + print("Failed to obtain access token.") + return + + identifier = os.getenv("ZOHO_ACCOUNT_ID", "").strip() + if not identifier: + identifier = input( + "Enter ZOHO_ACCOUNT_ID (numeric or email, leave blank to auto): " + ).strip() + if identifier.isdigit(): + account_id, default_from_email = identifier, "" + else: + account_id, default_from_email = ( + ZohoMailToolkit.resolve_account_id_static( + datacenter, access_token, identifier + ) + ) + if not account_id or not account_id.isdigit(): + print("Failed to resolve numeric ZOHO_ACCOUNT_ID.") + return + toolkit = ZohoMailToolkit( + access_token=access_token, account_id=account_id, datacenter=datacenter + ) + + # Action: Send Email + to_email = ( + os.getenv("ZOHO_TO_ADDRESS") + or input("Enter recipient email: ").strip() + ) + from_email = os.getenv("ZOHO_FROM_ADDRESS", "") or ( + default_from_email or None + ) + send_resp = toolkit.send_email( + to=[to_email], + subject="Zoho Mail - Send Email (CAMEL)", + content="This is a test email sent via ZohoMailToolkit (focused).", + is_html=False, + from_email=from_email, + ) + print("\nSend Email response:") + print(json.dumps(send_resp, indent=2)) + + # Action: Create Draft + draft_resp = toolkit.create_draft( + to=[to_email], + subject="Draft from CAMEL", + content="This is a draft created via ZohoMailToolkit.", + is_html=False, + from_email=from_email, + ) + print("\nCreate Draft response:") + print(json.dumps(draft_resp, indent=2)) + + # Action: Create Folder + folder_resp = toolkit.create_folder(name="CAMEL_Demo") + print("\nCreate Folder response:") + print(json.dumps(folder_resp, indent=2)) + created_folder_id = None + if isinstance(folder_resp, dict): + created_folder_id = folder_resp.get("folderId") or folder_resp.get( + "id" + ) + + # Action: Create Tag + tag_resp = toolkit.create_tag(name="CAMEL_Tag") + print("\nCreate Tag response:") + print(json.dumps(tag_resp, indent=2)) + created_tag_id = None + if isinstance(tag_resp, dict): + created_tag_id = tag_resp.get("labelId") or tag_resp.get("id") + + # Action: Create Task + task_resp = toolkit.create_task( + title="Follow up", description="Call customer", due_in_epoch_ms=None + ) + print("\nCreate Task response:") + print(json.dumps(task_resp, indent=2)) + + # Triggers (Polling) + print("\nTrigger: New Email (Polling)") + poll_resp = toolkit.trigger_new_email_poll( + folder_id=str(created_folder_id) if created_folder_id else "inbox", + limit=10, + ) + print(json.dumps(poll_resp, indent=2)) + + print("\nTrigger: New Email Matching Search (Polling)") + search_resp = toolkit.trigger_new_email_matching_search_poll( + query=f'from:"{to_email}"', folder_id="inbox", limit=10 + ) + print(json.dumps(search_resp, indent=2)) + + if created_tag_id: + print("\nTrigger: New Tagged Email (Polling)") + tagged_resp = toolkit.trigger_new_tagged_email_poll( + tag_id=str(created_tag_id), folder_id="inbox", limit=10 + ) + print(json.dumps(tagged_resp, indent=2)) + + +if __name__ == "__main__": + main() From 761b240a772323d5d578299789521f4ee29f4a15 Mon Sep 17 00:00:00 2001 From: Saedbhati Date: Fri, 26 Sep 2025 18:38:03 +0530 Subject: [PATCH 2/2] update --- camel/societies/workforce/workforce.py | 36 -------------------------- 1 file changed, 36 deletions(-) diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index e3b730c1af..0d612e8497 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -197,12 +197,6 @@ class Workforce(BaseNode): support native structured output. When disabled, the workforce uses the native response_format parameter. (default: :obj:`True`) - on_subtask_completed (Optional[Callable[[Task], None]], optional): - Callback function to be called when a subtask is completed. - (default: :obj:`None`) - on_subtask_failed (Optional[Callable[[Task], None]], optional): - Callback function to be called when a subtask fails. - (default: :obj:`None`) Example: >>> import asyncio @@ -255,8 +249,6 @@ def __init__( share_memory: bool = False, use_structured_output_handler: bool = True, task_timeout_seconds: Optional[float] = None, - on_subtask_completed: Optional[Callable[[Task], None]] = None, - on_subtask_failed: Optional[Callable[[Task], None]] = None, ) -> None: super().__init__(description) self._child_listening_tasks: Deque[ @@ -273,9 +265,6 @@ def __init__( if self.use_structured_output_handler: self.structured_handler = StructuredOutputHandler() self.metrics_logger = WorkforceLogger(workforce_id=self.node_id) - # Optional user callbacks for subtask lifecycle notifications - self.on_subtask_completed = on_subtask_completed - self.on_subtask_failed = on_subtask_failed self._task: Optional[Task] = None self._pending_tasks: Deque[Task] = deque() self._task_dependencies: Dict[str, List[str]] = {} @@ -2528,14 +2517,6 @@ async def _handle_failed_task(self, task: Task) -> bool: self._completed_tasks.append(task) if task.id in self._assignees: await self._channel.archive_task(task.id) - # Invoke failure callback before halting - if self.on_subtask_failed is not None: - try: - self.on_subtask_failed(task) - except Exception as cb_err: - logger.warning( - f"on_subtask_failed callback raised: {cb_err}" - ) return True # If too many tasks are failing rapidly, also halt to prevent infinite @@ -2673,15 +2654,6 @@ async def _handle_failed_task(self, task: Task) -> bool: self._completed_tasks.append(task) return False - # Notify failure after bookkeeping, before scheduling next work - if self.on_subtask_failed is not None: - try: - self.on_subtask_failed(task) - except Exception as cb_err: - logger.warning( - f"on_subtask_failed callback raised: {cb_err}" - ) - logger.debug( f"Task {task.id} failed and was {action_taken}. " f"Updating dependency state." @@ -2785,14 +2757,6 @@ async def _handle_completed_task(self, task: Task) -> None: if task.id in self._assignees: await self._channel.archive_task(task.id) - if self.on_subtask_completed is not None: - try: - self.on_subtask_completed(task) - except Exception as cb_err: - logger.warning( - f"on_subtask_completed callback raised: {cb_err}" - ) - # Ensure it's in completed tasks set by updating if it exists or # appending if it's new. task_found_in_completed = False