Skip to content

Agentic flow integration #1849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kairon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

def create_argument_parser():
from kairon.cli import importer, training, testing, conversations_deletion, translator, delete_logs,\
message_broadcast,content_importer, mail_channel_read
message_broadcast,content_importer, mail_channel_read, agentic_flow

parser = ArgumentParser(
prog="kairon",
Expand All @@ -64,6 +64,8 @@ def create_argument_parser():
message_broadcast.add_subparser(subparsers, parents=parent_parsers)
content_importer.add_subparser(subparsers, parents=parent_parsers)
mail_channel_read.add_subparser(subparsers, parents=parent_parsers)
agentic_flow.add_subparser(subparsers, parents=parent_parsers)

return parser


Expand Down
56 changes: 43 additions & 13 deletions kairon/actions/definitions/schedule.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from calendar import timegm
from datetime import datetime
import pickle
Expand All @@ -20,11 +21,12 @@
from kairon.actions.definitions.base import ActionsBase
from kairon.events.executors.factory import ExecutorFactory
from kairon.exceptions import AppException
from kairon.shared.actions.data_objects import ActionServerLogs, ScheduleAction
from kairon.shared.actions.data_objects import ActionServerLogs, ScheduleAction, ScheduleActionType
from kairon.shared.actions.exception import ActionFailure
from kairon.shared.actions.models import ActionType
from kairon.shared.actions.utils import ActionUtility
from kairon.shared.callback.data_objects import CallbackConfig
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import TASK_TYPE


Expand Down Expand Up @@ -78,7 +80,8 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
schedule_action = None
schedule_time = None
timezone = None
pyscript_code = None
execution_info = None
event_data = None
try:
action_config = self.retrieve_config()
dispatch_bot_response = action_config.get('dispatch_bot_response', True)
Expand All @@ -87,8 +90,6 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
tracker_data.update({'bot': self.bot})
schedule_action = action_config['schedule_action']
timezone = action_config['timezone']
callback = CallbackConfig.get_entry(name=schedule_action, bot=self.bot)
pyscript_code = callback['pyscript_code']
schedule_time, _ = ActionUtility.get_parameter_value(tracker_data,
action_config['schedule_time'],
self.bot)
Expand All @@ -98,12 +99,35 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
action_config['params_list'],
self.bot)
logger.info("schedule_data: " + str(schedule_data_log))
event_data = {'data': {'source_code': callback['pyscript_code'],
'predefined_objects': schedule_data
},
'date_time': date_parser.parse(schedule_time),
'timezone': action_config['timezone']
}

if action_config['schedule_action_type'] == ScheduleActionType.PYSCRIPT.value:
callback = CallbackConfig.get_entry(name=schedule_action, bot=self.bot)
event_data = {'data': {'source_code': callback['pyscript_code'],
'predefined_objects': schedule_data
},
'date_time': date_parser.parse(schedule_time),
'timezone': action_config['timezone']
}
execution_info = {
'pyscript_code': callback['pyscript_code'],
'type': ScheduleActionType.PYSCRIPT.value
}
elif action_config['schedule_action_type'] == ScheduleActionType.FLOW.value:
event_data = {
'data': {
'slot_data': json.dumps(schedule_data),
'flow_name': schedule_action,
'bot': self.bot,
'user': tracker.sender_id
},
'date_time': date_parser.parse(schedule_time),
'timezone': action_config['timezone'],
'is_flow': True
}
execution_info = {
'flow': schedule_action,
'type': ScheduleActionType.FLOW.value
}
await self.add_schedule_job(**event_data)
except Exception as e:
exception = e
Expand All @@ -128,7 +152,7 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
schedule_action=schedule_action,
schedule_time=schedule_time,
timezone=timezone,
pyscript_code=pyscript_code,
execution_info=execution_info,
data=schedule_data_log
).save()
return {}
Expand All @@ -137,13 +161,19 @@ async def add_schedule_job(self,
date_time: datetime,
data: Dict,
timezone: Text,
is_flow=False,
**kwargs):
func = obj_to_ref(ExecutorFactory.get_executor().execute_task)

_id = uuid7().hex
data['predefined_objects']['event'] = _id
args = (func, "scheduler_evaluator", data,)
kwargs.update({'task_type': TASK_TYPE.ACTION.value})
if is_flow:
args = (ExecutorFactory.get_executor(), EventClass.agentic_flow, data,)
kwargs.update({'task_type': TASK_TYPE.EVENT.value})
else:
kwargs.update({'task_type': TASK_TYPE.ACTION.value})
data['predefined_objects']['event'] = _id

trigger = DateTrigger(run_date=date_time, timezone=timezone)

next_run_time = trigger.get_next_fire_time(None, datetime.now(astimezone(timezone) or get_localzone()))
Expand Down
36 changes: 36 additions & 0 deletions kairon/cli/agentic_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from typing import List
from rasa.cli import SubParsersAction

from kairon.events.definitions.agentic_flow import AgenticFlowEvent


def exec_agentic_flow(args):
AgenticFlowEvent(args.bot, args.user).execute(flow_name=args.flow_name, slot_data=args.slot_data)
Comment on lines +8 to +9
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for AgenticFlowEvent execution

The function doesn't handle any exceptions that might be raised during the execution of the agentic flow, which could lead to ungraceful termination of the CLI command.

def exec_agentic_flow(args):
+    try:
        AgenticFlowEvent(args.bot, args.user).execute(flow_name=args.flow_name, slot_data=args.slot_data)
+        return 0
+    except Exception as e:
+        print(f"Error executing agentic flow '{args.flow_name}': {str(e)}")
+        return 1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def exec_agentic_flow(args):
AgenticFlowEvent(args.bot, args.user).execute(flow_name=args.flow_name, slot_data=args.slot_data)
def exec_agentic_flow(args):
try:
AgenticFlowEvent(args.bot, args.user).execute(flow_name=args.flow_name, slot_data=args.slot_data)
return 0
except Exception as e:
print(f"Error executing agentic flow '{args.flow_name}': {str(e)}")
return 1



def add_subparser(subparsers: SubParsersAction, parents: List[ArgumentParser]):
agentic_fow_parser = subparsers.add_parser(
"agentic-flow",
conflict_handler="resolve",
formatter_class=ArgumentDefaultsHelpFormatter,
parents=parents,
help="Mail channel initiate reading"
)
agentic_fow_parser.add_argument('bot',
type=str,
help="Bot id for which command is executed", action='store')

agentic_fow_parser.add_argument('user',
type=str,
help="Kairon user who is initiating the command", action='store')

agentic_fow_parser.add_argument('flow_name',
type=str,
help="Kairon flow name to execute", action='store')

agentic_fow_parser.add_argument('slot_data',
type=str,
help="json containing slot values dictionary", action='store')

agentic_fow_parser.set_defaults(func=exec_agentic_flow)
70 changes: 70 additions & 0 deletions kairon/events/definitions/agentic_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import json
from typing import Text
from loguru import logger
from kairon import Utility
from kairon.events.definitions.base import EventsBase
from kairon.exceptions import AppException
from kairon.shared.chat.agent.agent_flow import AgenticFlow
from kairon.shared.constants import EventClass


class AgenticFlowEvent(EventsBase):
"""
Event to execute an agentic flow
"""

def __init__(self, bot: Text, user: Text, **kwargs):
"""
Initialise event.
"""
self.bot = bot
self.user = user
self.flow_name = kwargs.get('flow_name')

def validate(self):
"""
validate mail channel exists and works properly
"""
if self.flow_name:
return AgenticFlow.flow_exists(self.bot, self.flow_name)

def enqueue(self, **kwargs):
"""
Send event to event server.
"""
try:
payload = {'bot': self.bot, 'user': self.user}
if flow_name := kwargs.get('flow_name'):
payload['flow_name'] = flow_name
self.flow_name = flow_name
if slot_data := kwargs.get('slot_data'):
payload['slot_data'] = slot_data
self.validate()
Utility.request_event_server(EventClass.agentic_flow, payload)
except Exception as e:
logger.error(str(e))
raise AppException(e)

def execute(self, **kwargs):
"""
Execute the event.
"""
try:
if flow_name := kwargs.get('flow_name'):
self.flow_name = flow_name

slot_vals = {}
if slot_data := kwargs.get('slot_data'):
if isinstance(slot_data, str):
slot_data = json.loads(slot_data)
slot_vals = slot_data
flow = AgenticFlow(bot=self.bot, slot_vals=slot_vals)
resp, errors = asyncio.run(flow.execute_rule(self.flow_name))
logger.info(resp)
if errors:
logger.error(f"Failed to execute flow {self.flow_name}. Errors: {errors}")
raise AppException(f"Failed to execute flow {self.flow_name}. Errors: {errors}")
except Exception as e:
logger.error(str(e))
raise AppException(f"Failed to execute flow {self.flow_name} for bot {self.bot}. Error: {str(e)}")
2 changes: 2 additions & 0 deletions kairon/events/definitions/factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from kairon.events.definitions.agentic_flow import AgenticFlowEvent
from kairon.events.definitions.content_importer import DocContentImporterEvent
from kairon.events.definitions.data_importer import TrainingDataImporterEvent
from kairon.events.definitions.faq_importer import FaqDataImporterEvent
Expand All @@ -23,6 +24,7 @@ class EventFactory:
EventClass.message_broadcast: MessageBroadcastEvent,
EventClass.content_importer: DocContentImporterEvent,
EventClass.mail_channel_read_mails: MailReadEvent,
EventClass.agentic_flow: AgenticFlowEvent
}

@staticmethod
Expand Down
8 changes: 8 additions & 0 deletions kairon/shared/actions/data_objects.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum

from mongoengine import (
EmbeddedDocument,
Expand Down Expand Up @@ -963,6 +964,10 @@ def clean(self):
raise ValidationError("Action name cannot start with utter_")


class ScheduleActionType(Enum):
PYSCRIPT = "pyscript"
FLOW = "flow"

@auditlogger.log
@push_notification.apply
class ScheduleAction(Auditlog):
Expand All @@ -972,6 +977,9 @@ class ScheduleAction(Auditlog):
schedule_time = EmbeddedDocumentField(CustomActionDynamicParameters)
timezone = StringField(default="UTC", required=True)
schedule_action = StringField(required=True)
schedule_action_type = StringField(
default=ScheduleActionType.PYSCRIPT.value, choices=[type.value for type in ScheduleActionType]
)
response_text = StringField(required=False)
params_list = ListField(
EmbeddedDocumentField(CustomActionRequestParameters), required=False
Expand Down
14 changes: 7 additions & 7 deletions kairon/shared/callback/data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_all_names(bot) -> list[str]:
return list(names)

@staticmethod
def get_entry(bot, name) -> dict:
def get_entry(bot :str, name :str) -> dict:
entry = CallbackConfig.objects(bot=bot, name__iexact=name).first()
if not entry:
raise AppException(f"Callback Configuration with name '{name}' does not exist!")
Expand Down Expand Up @@ -147,7 +147,7 @@ def create_entry(bot: str,
return config.to_mongo().to_dict()

@staticmethod
def get_auth_token(bot, name) -> tuple[str, bool]:
def get_auth_token(bot: str, name: str) -> tuple[str, bool]:
entry = CallbackConfig.objects(bot=bot, name__iexact=name).first()
if not entry:
raise AppException(f"Callback Configuration with name '{name}' does not exist!")
Expand Down Expand Up @@ -281,7 +281,7 @@ def create_entry(name: str, callback_config_name: str, bot: str, sender_id: str,
return callback_url, identifier, is_standalone

@staticmethod
def get_value_from_json(json_obj, path):
def get_value_from_json(json_obj: Any, path: str):
keys = path.split('.')
value = json_obj
try:
Expand All @@ -295,7 +295,7 @@ def get_value_from_json(json_obj, path):
return value

@staticmethod
def validate_entry(token: str, identifier: Optional[str] = None, request_body: Any = None):
def validate_entry(token: str, identifier: Optional[str] = None, request_body: Any = None) -> tuple[dict, dict]:
check_nonempty_string(token)
config_entry = CallbackConfig.verify_auth_token(token)

Expand Down Expand Up @@ -363,7 +363,7 @@ def create_success_entry(name: str,
request_data: Any,
metadata: dict,
callback_url: str,
callback_source: str):
callback_source: str) -> dict:
check_nonempty_string(name)
check_nonempty_string(bot)
check_nonempty_string(identifier)
Expand Down Expand Up @@ -396,7 +396,7 @@ def create_failure_entry(name: str,
request_data: Any,
metadata: dict,
callback_url: str,
callback_source: str):
callback_source: str) -> dict:
check_nonempty_string(name)
check_nonempty_string(bot)
check_nonempty_string(identifier)
Expand All @@ -419,7 +419,7 @@ def create_failure_entry(name: str,
return record.to_mongo().to_dict()

@staticmethod
def get_logs(query: dict, offset: int, limit: int):
def get_logs(query: dict, offset: int, limit: int) -> tuple[list[dict], int]:
logs = CallbackLog.objects(**query).skip(offset).limit(limit).exclude('id').order_by('-timestamp').to_json()
logs_dict_list = json.loads(logs)
for log in logs_dict_list:
Expand Down
Loading
Loading