diff --git a/nextpy/ai/agent/assistant_agent.py b/nextpy/ai/agent/assistant_agent.py new file mode 100644 index 00000000..6b1425ef --- /dev/null +++ b/nextpy/ai/agent/assistant_agent.py @@ -0,0 +1,214 @@ +from typing import Any, Callable, Tuple +from nextpy.ai.agent.base_agent import BaseAgent +import logging +from pathlib import Path +from nextpy.ai import engine +import inspect +import asyncio +import logging + + +def _call_functions(functions): + for function, arguments, keyword_args in functions: + if inspect.iscoroutinefunction(function): + try: + other_loop = asyncio.get_event_loop() + import nest_asyncio + + nest_asyncio.apply(other_loop) + except RuntimeError: + pass + loop = asyncio.new_event_loop() + loop.run_until_complete(function(*arguments, **keyword_args)) + else: + function(*arguments, **keyword_args) + + +async def _a_call_functions(functions): + for function, arguments, keyword_args in functions: + if inspect.iscoroutinefunction(function): + await function(*arguments, **keyword_args) + else: + function(*arguments, **keyword_args) + + +class AssistantAgent(BaseAgent): + """ + AssistantAgent class represents an assistant agent that interacts with users in a conversational manner. + + :param name: The name of the assistant agent. + :type name: str + :param llm: The language model used by the assistant agent. + :type llm: LanguageModel + :param memory: The memory used by the assistant agent. + :type memory: Memory + :param async_mode: Whether the assistant agent should run in asynchronous mode or not. Default is True. + :type async_mode: bool, optional + :param system_message: The system message included in the prompt. Default is None. + :type system_message: str, optional + :param functions_before_call: List of functions to be called before the main function call. Default is None. + :type functions_before_call: List[Callable], optional + :param functions_after_call: List of functions to be called after the main function call. Default is None. + :type functions_after_call: List[Callable], optional + + The assistant agent is built on top of the existing BaseAgent and serves as a simple interface for creating an AI assistant agent. + It provides a convenient way to define an AI assistant agent that can interact with users in a conversational manner. + The assistant agent can be customized with a name, language model, memory, and other parameters. + It also supports asynchronous mode, allowing it to handle multiple conversations simultaneously. + + MultiagentManager can be used to manage multiple assistant agents and coordinate their interactions with users. + + + Example: + + + tailwind_agent = AssistantAgent(name='Tailwind Class Generator', llm=llm, memory=None, async_mode=False, + system_message='''automates the creation of Tailwind CSS classes, streamlining the process of building stylish and responsive user interfaces. By leveraging advanced algorithms and design principles, the Tailwind Class Generator analyzes your design elements and dynamically generates the optimal set of Tailwind utility classes. + This tool is designed to enhance efficiency in web development, allowing developers to focus more on high-level design decisions and less on manually crafting individual CSS rules. With the Tailwind Class Generator, achieving a visually appealing and consistent design becomes a seamless experience. + ''' + ) + """ + + DEFAULT_PROMPT = ''' + {{#system~}} {{name}}, you are working in the following team :{{agents}} + {{~/system}} + + {{#user~}} + Read the following CONVERSATION : + {{messages}} + Respond as {{name}}. Do not thank any team member or show appreciation." + {{~/user}} + + {{#assistant~}} + {{gen 'answer' temperature=0 max_tokens=500}} + {{~/assistant}} + ''' + + def __init__(self, + name, + llm=None, + memory=None, + async_mode: bool = False, + system_message: str | None = None, + custom_engine=None, + functions_before_call: Tuple[Callable, + Tuple[Any], Tuple[Any]] | None = None, + functions_after_call: Tuple[Callable, + Tuple[Any], Tuple[Any]] | None = None, + description: str = "Helpful AI Assistant Agent", + **kwargs): + """ + Initializes an instance of the AssistantAgent class. + + :param name: The name of the assistant agent. + :type name: str + :param llm: The language model used by the assistant agent. + :type llm: LanguageModel + :param memory: The memory used by the assistant agent. + :type memory: Memory + :param async_mode: Whether the assistant agent should run in asynchronous mode or not. Default is True. + :type async_mode: bool, optional + :param system_message: The system message to be displayed to the user. Default is None. + :type system_message: str, optional + :param engine: The engine used by the assistant agent. Either llm or engine must be provided. + :type engine: Engine, optional + :param functions_before_call: List of functions, args and kwargs, to be called before the main function call. Default is None. + :type functions_before_call: List[Callable], optional + :param functions_after_call: List of functions, args and kwargs to be called after the main function call. Default is None. + :type functions_after_call: List[Callable], optional + :param kwargs: Additional keyword arguments. + """ + super().__init__(llm=llm, **kwargs) + self.name = name + self.prompt = self.DEFAULT_PROMPT + self.system_message = system_message + # This is used by multiagent manager to determine whether to use receive or a_receive + self.async_mode = async_mode + + if system_message is not None: + try: + system_message = Path(system_message).read_text() + except Exception: + pass + self.prompt = self.prompt[:self.prompt.find( + '{{~/system}}')] + system_message + self.prompt[self.prompt.find('{{~/system}}'):] + + # Either llm or engine must be provided + if llm is not None or engine is not None: + logging.debug("Warning! Either llm or engine must be provided.") + + self.engine = custom_engine if custom_engine is not None else engine( + template=self.prompt, llm=llm, memory=memory, async_mode=async_mode, **kwargs) + self.output_key = 'answer' + self.functions_before_call = functions_before_call + self.functions_after_call = functions_after_call + self.description = description + + @staticmethod + def function_call_decorator(func): + """ + Decorator function that wraps the main function call with additional functions to be called before and after. + + :param func: The main function to be called. + :type func: Callable + :return: The wrapped function. + :rtype: Callable + """ + if inspect.iscoroutinefunction(func): + async def a_inner(self, *args, **kwargs): + if self.functions_before_call is not None: + await _a_call_functions(self.functions_before_call) + + result = await func(self, *args, **kwargs) + + if self.functions_after_call is not None: + await _a_call_functions(self.functions_after_call) + + return result + return a_inner + else: + def inner(self, *args, **kwargs): + if self.functions_before_call is not None: + _call_functions(self.functions_before_call) + + result = func(self, *args, **kwargs) + + if self.functions_after_call is not None: + _call_functions(self.functions_after_call) + + return result + return inner + + @function_call_decorator + def receive(self, agents, messages, termination_message): + """ + Receives messages from other agents and generates a response. + + :param agents: The list of agents involved in the conversation. + :type agents: List[str] + :param messages: The list of messages in the conversation. + :type messages: List[str] + :param termination_message: The termination message for the conversation. + :type termination_message: str + :return: The generated response. + :rtype: str + """ + output = self.run(agents=agents, messages=messages, name=self.name) + return output + + @function_call_decorator + async def a_receive(self, agents, messages, termination_message): + """ + Asynchronously receives messages from other agents and generates a response. + + :param agents: The list of agents involved in the conversation. + :type agents: List[str] + :param messages: The list of messages in the conversation. + :type messages: List[str] + :param termination_message: The termination message for the conversation. + :type termination_message: str + :return: The generated response. + :rtype: str + """ + output = await self.arun(agents=agents, messages=messages, name=self.name) + return output diff --git a/nextpy/ai/agent/multiagent_manager.py b/nextpy/ai/agent/multiagent_manager.py new file mode 100644 index 00000000..48faf095 --- /dev/null +++ b/nextpy/ai/agent/multiagent_manager.py @@ -0,0 +1,261 @@ +from typing import Tuple, List, Any +from nextpy.ai.agent.assistant_agent import AssistantAgent +from nextpy.ai import engine + + +class MultiAgentManager: + """ + A class that manages multiple agents in a role-playing game. + + Attributes: + DEFAULT_PROMPT (str): The default prompt for the game. + SOLUTION_PROMPT (str): The prompt for generating the final solution. + agents (Tuple[AssistantAgent]): A tuple of AssistantAgent objects representing the participating agents. + messages (List[Any]): A list of messages exchanged between the agents and the user. + termination_message (str): The termination message indicating the end of the game. + error_message (str): The error message indicating an error in the game. + mode (str): The mode of the game (e.g., 'BROADCAST', 'ROUND_ROBIN'). + rounds (int): The number of rounds to play. + round_robin (bool): A flag indicating whether to use round-robin mode. + llm: The language model used by the agents. + memory: The memory used by the agents. + async_mode (bool): A flag indicating whether to use asynchronous mode. + debug_mode (bool): A flag indicating whether to enable debug mode. + """ + DEFAULT_PROMPT = ''' + {{#system~}} You are playing a role playing game with the following participants : \n{{agents}}{{~/system}} + + {{#user~}} + Read the following conversation and choose who the next speaker will be: + {{messages}} + Simply respond with the NAME of the next speaker without any other characters such as numbers or punctuations. + {{~/user}} + + {{#assistant~}} + {{gen 'answer' temperature=0 max_tokens=500}} + {{~/assistant}} + ''' + + SOLUTION_PROMPT = ''' + {{#system~}} You are a helpful and terse AI assistant{{~/system}} + + {{#user~}} + Read the following conversation: + {{messages}} + Now generate the final solution to the User's query. + {{~/user}} + + {{#assistant~}} + {{gen 'answer' temperature=0 max_tokens=500}} + {{~/assistant}} + ''' + + def __init__(self, + agents: Tuple[AssistantAgent], + messages: List[Any] | None = None, + termination_message: str = 'TERMINATE SUCCESSFULLY', + error_message: str = 'ERROR', + mode: str = 'BROADCAST', + rounds: int = 5, + round_robin: bool = True, + llm=None, + memory=None, + async_mode=False, + debug_mode=False): + + if messages is None: + messages = [] + + self.debug_mode = debug_mode + + if not any([isinstance(agent, AssistantAgent) + for agent in agents]): + self.DEFAULT_PROMPT = self.DEFAULT_PROMPT[:self.DEFAULT_PROMPT.find( + '{{~/system}}')] + '\nNote, User is also a participant, you can choose User.' + self.DEFAULT_PROMPT[self.DEFAULT_PROMPT.find('{{~/system}}'):] + else: + self.DEFAULT_PROMPT = self.DEFAULT_PROMPT[:self.DEFAULT_PROMPT.find( + '{{~/system}}')] + '\nNote, User is not a participant, you cannot choose User.' + self.DEFAULT_PROMPT[self.DEFAULT_PROMPT.find('{{~/system}}'):] + self.engine = engine( + self.DEFAULT_PROMPT, llm=llm, memory=memory, async_mode=async_mode) + self.solution_summarizer = engine( + self.SOLUTION_PROMPT, llm=llm, memory=memory, async_mode=async_mode) + + self.agents = agents + self.agent_dict = {agent.name: agent for agent in agents} + self.messages = messages + self.termination_message = termination_message + self.error_message = error_message + self.mode = mode + self.rounds = rounds + self.round_robin = round_robin + self.current_agent = 0 # Used to keep track of next agent in sequence + + @property + def agent_string(self): + """ + Returns a string representation of all the agent names separated by commas. + """ + return '\n\n'.join([f'NAME: {agent.name}\n DESC: {agent.description}' for agent in self.agents]) + + def run_sequence(self, context): + """ + Runs the sequence of agents in the multi-agent system. + + Args: + context: The context for the current round. + + Returns: + A list of messages exchanged between agents during the sequence. + """ + self.messages.append(['User', context]) + while self.rounds != 0 and not self._termination_message_received(): + if self.debug_mode: + print( + f'{"-"*5}Messaging next agent : {self.agents[self.current_agent].name}{"-"*5}\n\n') + + self._message_next_agent() + + if self.debug_mode: + print(f'{self.messages[-1][0]}\n\n{self.messages[-1][1]}') + + if self.current_agent == 0 and not self.round_robin: + break + + self.rounds -= 1 + return self.messages + + async def a_run_sequence(self, context): + """ + Runs the sequence of agents in the multi-agent system in async. + + Args: + context: The context for the current round. + + Returns: + A list of messages exchanged between agents during the sequence. + """ + self.messages.append(['User', context]) + while self.rounds != 0 and not self._termination_message_received(): + if self.debug_mode: + print( + f'{"-"*5}Messaging next agent : {self.agents[self.current_agent].name}{"-"*5}\n\n') + + await self._a_message_next_agent() + if self.debug_mode: + print( + f'{self.messages[-1][0]}\n\n{self.messages[-1][1]}') + + if self.current_agent == 0 and not self.round_robin: + break + + self.rounds -= 1 + return self.messages + + def run_auto(self, context): + """ + Runs the multi-agent manager in auto mode. + + Args: + context: The context for the multi-agent manager. + + Returns: + A list containing the messages exchanged between agents and the final solution. + """ + self.messages.append(['User', context]) + while self.rounds != 0 and not self._termination_message_received(): + next_agent = self._choose_next_agent() + if self.debug_mode: + print( + f'{"-" * 5}Messaging next agent : {next_agent.name}{"-" * 5}\n\n') + + self._message_next_agent(next_agent) + if self.debug_mode: + print( + f'{self.messages[-1][0]}\n\n{self.messages[-1][1]}') + + self.rounds -= 1 + final_solution = self.solution_summarizer( + messages=self._parse_messages()).get('answer') + + if self.debug_mode: + print(final_solution) + + return [self.messages, final_solution] + + async def _a_message_next_agent(self, next_agent=None): + """ + Sends a message to the next agent in the list and receives a response. + + Args: + next_agent (Agent, optional): The next agent to send the message to. If not provided, + the next agent in the list will be selected. Defaults to None. + + Returns: + None + """ + if next_agent is None: + next_agent = self.agents[self.current_agent] + self.current_agent = (self.current_agent + 1) % len(self.agents) + + if next_agent.async_mode: + received_message = await next_agent.a_receive( + self.agent_string, self._parse_messages(), self.termination_message) + else: + received_message = next_agent.receive( + self.agent_string, self._parse_messages(), self.termination_message) + + self.messages.append([next_agent.name, received_message]) + + def _message_next_agent(self, next_agent=None): + """ + Sends a message to the next agent in the sequence and receives a response. + + Args: + next_agent (Agent, optional): The next agent to send the message to. If None, the next agent in the sequence is used. + + Returns: + None + """ + + if next_agent is None: + next_agent = self.agents[self.current_agent] + self.current_agent = (self.current_agent + 1) % len(self.agents) + + assert not next_agent.async_mode, "Don't use run_sequence for async agents, use a_run_sequence instead" + + received_message = next_agent.receive( + self.agent_string, self._parse_messages(), self.termination_message) + + self.messages.append([next_agent.name, received_message]) + + def _termination_message_received(self): + """ + Checks if the termination message is present in the last received message. + + Returns: + bool: True if the termination message is present, False otherwise. + """ + return self.termination_message in self.messages[-1][1] + + def _parse_messages(self): + """ + Parses the messages stored in the `self.messages` list and returns a formatted string. + + Returns: + str: A formatted string containing the parsed messages. + """ + return f'\n\n{"-"*20}'.join([f'{index}) {message[0]}\n{message[1]}' for index, message in enumerate(self.messages)]) + + def _choose_next_agent(self): + """ + Chooses the next agent based on the output of the engine. + + Returns: + The next agent to be used. + + """ + output = self.engine(agents=self.agent_string, + messages=self._parse_messages()) + if self.debug_mode: + print(f"Chosen next agent as {output.get('answer')}") + return self.agent_dict[output.get('answer')] diff --git a/nextpy/ai/agent/userproxy_agent.py b/nextpy/ai/agent/userproxy_agent.py new file mode 100644 index 00000000..5297a128 --- /dev/null +++ b/nextpy/ai/agent/userproxy_agent.py @@ -0,0 +1,79 @@ +from nextpy.ai.agent.assistant_agent import AssistantAgent +from typing import Any, Tuple, Callable + + +class UserProxyAgent(AssistantAgent): + """ + Initializes a UserProxyAgent instance. + + :param async_mode: Indicates if the agent should operate in asynchronous mode. + :param functions_before_call: A tuple of functions to be called before the main function call. + :param functions_after_call: A tuple of functions to be called after the main function call. + :param description: A brief description of the agent's capabilities. + :param kwargs: Additional keyword arguments. + """ + + def __init__(self, + async_mode: bool = False, + functions_before_call: Tuple[Callable, + Tuple[Any], Tuple[Any]] | None = None, + functions_after_call: Tuple[Callable, + Tuple[Any], Tuple[Any]] | None = None, + description: str = "User Proxy Agent capable of receiving user input.", + **kwargs): + self.name = 'User' + self.description = description + self.async_mode = async_mode + self.functions_before_call = functions_before_call + self.functions_after_call = functions_after_call + + @AssistantAgent.function_call_decorator + def receive(self, *args, **kwargs): + """ + Receives messages from other agents and generates a response. + + :param agents: The list of agents involved in the conversation. + :type agents: List[str] + :param messages: The list of messages in the conversation. + :type messages: List[str] + :param termination_message: The termination message for the conversation. + :type termination_message: str + :return: The generated response. + :rtype: str + """ + return self._receive_user_input() + + @AssistantAgent.function_call_decorator + async def a_receive(self, *args, **kwargs): + """ + Asynchronously receives messages from other agents and generates a response. + + :param agents: The list of agents involved in the conversation. + :type agents: List[str] + :param messages: The list of messages in the conversation. + :type messages: List[str] + :param termination_message: The termination message for the conversation. + :type termination_message: str + :return: The generated response. + :rtype: str + """ + return await self.a_receive_user_input() + + def _receive_user_input(self): + """ + Receives user input and returns it as the response. + + :return: The user input. + :rtype: str + """ + return input('Provide feedback to chat_manager:') + + async def a_receive_user_input(self): + """ + Asynchronously receives user input and returns it as the response. + + :return: The user input. + :rtype: str + """ + import asyncio + return await asyncio.to_thread(input, 'Provide feedback to chat_manager:')