Skip to content

Developed an end-to-end catalog ingestion flow from Petpooja(POS) to Meta #1932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions email.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ email:
book_a_demo_subject: ${EMAIL_TEMPLATES_BOOK_A_DEMO_SUBJECT:"kAIron Demo Requested"}
member_left_bot_subject: ${EMAIL_TEMPLATES_USER_LEFT_BOT_SUBJECT:"User has left the BOT_NAME bot"}
member_left_bot_mail_body: ${EMAIL_TEMPLATES_USER_LEFT_BOT_BODY:"User USER_NAME has left the BOT_NAME bot."}
catalog_sync_status_subject: ${EMAIL_TEMPLATES_CATALOG_SYNC_UPDATE:"Catalog Sync Update"}

3 changes: 2 additions & 1 deletion kairon/api/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from kairon.api.app.routers import auth, augment, history, user, account, idp, system
from kairon.api.app.routers.bot import action, bot, agents, secrets, multilingual, metric, data, \
channels, custom_widgets
channels, custom_widgets, integrations
from kairon.api.models import Response
from kairon.exceptions import AppException
from kairon.shared.account.processor import AccountProcessor
Expand Down Expand Up @@ -269,3 +269,4 @@ def healthcheck():
app.include_router(system.router, prefix="/api/system", tags=["Application"])
app.include_router(data.router, prefix="/api/bot/{bot}/data", tags=["File Upload/Download"])
app.include_router(custom_widgets.router, prefix="/api/bot/{bot}/widgets", tags=["Custom analytical widgets"])
app.include_router(integrations.router, prefix="/api/bot/integration", tags=["Data Integrations"])
34 changes: 29 additions & 5 deletions kairon/api/app/routers/bot/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from kairon.shared.cognition.data_objects import CognitionSchema
from kairon.shared.cognition.processor import CognitionDataProcessor
from kairon.shared.concurrency.actors.factory import ActorFactory
from kairon.shared.constants import ActorType
from kairon.shared.constants import ActorType, CatalogSyncClass
from kairon.shared.constants import DESIGNER_ACCESS
from kairon.shared.data.data_models import POSIntegrationRequest
from kairon.shared.data.data_models import BulkDeleteRequest
from kairon.shared.data.processor import MongoProcessor
from kairon.shared.models import User
Expand Down Expand Up @@ -342,7 +343,7 @@ async def download_error_csv(
async def knowledge_vault_sync(
primary_key_col: str,
collection_name: str,
event_type: str,
sync_type: str,
data: List[dict],
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
Expand All @@ -351,7 +352,7 @@ async def knowledge_vault_sync(
"""
data = [{key.lower(): value for key, value in row.items()} for row in data]

error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data, current_user.get_bot())
error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), sync_type.lower(), data, current_user.get_bot())

if error_summary:
return Response(
Expand All @@ -361,11 +362,34 @@ async def knowledge_vault_sync(
error_code=400
)

await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data,
await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), sync_type.lower(), data,
current_user.get_bot(), current_user.get_user())

return Response(
success=True,
message="Processing completed successfully",
data=None
)
)


@router.post("/integrations/add", response_model=Response)
async def add_pos_integration_config(
request_data: POSIntegrationRequest,
sync_type: str,
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
):
"""
Add data integration config
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider error handling for catalog provider mappings.

The method load_catalog_provider_mappings() is called without error handling. If this fails, it would result in an unhandled exception.

-    CognitionDataProcessor.load_catalog_provider_mappings()
+    try:
+        CognitionDataProcessor.load_catalog_provider_mappings()
+    except Exception as e:
+        raise AppException(f"Failed to load catalog provider mappings: {str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CognitionDataProcessor.load_catalog_provider_mappings()
try:
CognitionDataProcessor.load_catalog_provider_mappings()
except Exception as e:
raise AppException(f"Failed to load catalog provider mappings: {str(e)}")

CognitionDataProcessor.load_catalog_provider_mappings()

if request_data.provider not in CatalogSyncClass.__members__.values():
raise AppException("Invalid Provider")
Comment on lines +386 to +388
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect Enum membership check – always evaluates to False

CatalogSyncClass.__members__.values() returns the Enum members, not their
.value strings, while request_data.provider is a plain string.
Use the _value2member_map_ helper or compare against a comprehension.

-if request_data.provider not in CatalogSyncClass.__members__.values():
+if request_data.provider not in CatalogSyncClass._value2member_map_:
     raise AppException("Invalid Provider")

Without this fix every valid provider string will be rejected.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if request_data.provider not in CatalogSyncClass.__members__.values():
raise AppException("Invalid Provider")
if request_data.provider not in CatalogSyncClass._value2member_map_:
raise AppException("Invalid Provider")


CognitionDataProcessor.add_bot_sync_config(request_data, current_user.get_bot(), current_user.get_user())

integration_endpoint = cognition_processor.save_pos_integration_config(
request_data.dict(), current_user.get_bot(), current_user.get_user(), sync_type
)

return Response(message='POS Integration Complete', data=integration_endpoint)
84 changes: 84 additions & 0 deletions kairon/api/app/routers/bot/integrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Text

from fastapi import Security, APIRouter, Path
from starlette.requests import Request

from kairon.api.models import Response
from kairon.events.definitions.catalog_sync import CatalogSync
from kairon.exceptions import AppException
from kairon.shared.auth import Authentication
from kairon.shared.catalog_sync.data_objects import CatalogSyncLogs
from kairon.shared.cognition.processor import CognitionDataProcessor
from kairon.shared.constants import CatalogProvider
from kairon.shared.constants import DESIGNER_ACCESS
from kairon.shared.models import User

router = APIRouter()
cognition_processor = CognitionDataProcessor()

@router.post("/{provider}/{sync_type}/{bot}/{token}", response_model=Response)
async def sync_data(
request: Request,
provider: CatalogProvider = Path(description="Catalog provider name",
examples=[CatalogProvider.PETPOOJA.value]),
bot: Text = Path(description="Bot id"),
sync_type: Text = Path(description="Sync Type"),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
token: str = Path(description="JWT token for authentication"),
):
Comment on lines +22 to +28
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect examples kw-arg for Path; should be example

FastAPI’s Path accepts a single example, not examples. Supplying the plural will be ignored and Ruff flags B008 still applies.

Replace examples=[…] by example=… for all three occurrences.

-    provider: CatalogProvider = Path(description="Catalog provider name",
-                                 examples=[CatalogProvider.PETPOOJA.value]),
+    provider: CatalogProvider = Path(
+        description="Catalog provider name",
+        example=CatalogProvider.PETPOOJA.value,
+    ),

Repeat for the rerun_sync endpoint as well.

Also applies to: 55-61

🧰 Tools
🪛 Ruff (0.8.2)

23-24: Do not perform function call Path in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


25-25: Do not perform function call Path in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


26-26: Do not perform function call Path in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


27-27: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

"""
Handles incoming data from catalog_sync (e.g., Petpooja) for processing, validation, and eventual storage.
"""

request_body = await request.json()

event = CatalogSync(
bot=bot,
user=current_user.get_user(),
provider=provider,
sync_type=sync_type,
token=token
)

is_event_data = await event.validate(request_body=request_body)
if is_event_data is True:
event.enqueue()
return {"message": "Sync in progress! Check logs."}
else:
raise AppException(is_event_data)

Comment on lines +33 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden request-body parsing and error propagation

await request.json() will raise JSONDecodeError on invalid JSON, returning a 500.
Wrap the call and surface a clear 400 to the client instead.

-    request_body = await request.json()
+    try:
+        request_body = await request.json()
+    except ValueError as e:
+        raise AppException(f"Malformed JSON payload: {e}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
request_body = await request.json()
event = CatalogSync(
bot=bot,
user=current_user.get_user(),
provider=provider,
sync_type=sync_type,
token=token
)
is_event_data = await event.validate(request_body=request_body)
if is_event_data is True:
event.enqueue()
return {"message": "Sync in progress! Check logs."}
else:
raise AppException(is_event_data)
try:
request_body = await request.json()
except ValueError as e:
raise AppException(f"Malformed JSON payload: {e}") from e
event = CatalogSync(
bot=bot,
user=current_user.get_user(),
provider=provider,
sync_type=sync_type,
token=token
)
is_event_data = await event.validate(request_body=request_body)
if is_event_data is True:
event.enqueue()
return {"message": "Sync in progress! Check logs."}
else:
raise AppException(is_event_data)



@router.post("/{provider}/{sync_type}/{bot}/{token}/{execution_id}", response_model=Response)
async def rerun_sync(
provider: CatalogProvider = Path(description="Catalog provider name",
examples=[CatalogProvider.PETPOOJA.value]),
bot: Text = Path(description="Bot id"),
sync_type: Text = Path(description="Sync Type"),
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS),
token: str = Path(description="JWT token for authentication"),
execution_id: str = Path(description="Execution id"),
):
"""
Handles incoming data from catalog_sync (e.g., Petpooja) for processing, validation, and eventual storage.
"""
sync_log_entry = CatalogSyncLogs.objects(execution_id=execution_id).first()
if not sync_log_entry:
raise AppException(f"Sync log with execution ID {execution_id} not found.")

request_body = sync_log_entry.raw_payload

event = CatalogSync(
bot=bot,
user=current_user.get_user(),
provider=provider,
sync_type=sync_type,
token=token
)

is_event_data = await event.validate(request_body=request_body)
if is_event_data is True:
event.enqueue()
return {"message": "Sync in progress! Check logs."}
else:
raise AppException(is_event_data)
18 changes: 18 additions & 0 deletions kairon/catalog_sync/definitions/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from abc import abstractmethod


class CatalogSyncBase:

"""Base class to create events"""

@abstractmethod
def validate(self):
raise NotImplementedError("Provider not implemented")

@abstractmethod
def preprocess(self):
raise NotImplementedError("Provider not implemented")

@abstractmethod
def execute(self, **kwargs):
raise NotImplementedError("Provider not implemented")
29 changes: 29 additions & 0 deletions kairon/catalog_sync/definitions/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from kairon.events.definitions.petpooja_sync import PetpoojaSync
from kairon.exceptions import AppException
from kairon.shared.constants import CatalogSyncClass


class CatalogSyncFactory:

__provider_implementations = {
CatalogSyncClass.petpooja: PetpoojaSync,
}

@staticmethod
def get_instance(provider: str):
"""
Factory to retrieve catalog provider implementation for execution.
:param provider: catalog provider name (e.g., "petpooja")
:return: Corresponding Sync class
"""
try:
provider_enum = CatalogSyncClass(provider.lower())
except ValueError:
valid_syncs = [sync.value for sync in CatalogSyncClass]
raise AppException(f"'{provider}' is not a valid catalog sync provider. Accepted types: {valid_syncs}")

sync_class = CatalogSyncFactory.__provider_implementations.get(provider_enum)
if not sync_class:
raise AppException(f"No implementation found for provider '{provider}'.")

return sync_class
73 changes: 73 additions & 0 deletions kairon/events/definitions/catalog_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Text
from kairon import Utility
from loguru import logger

from kairon.catalog_sync.definitions.factory import CatalogSyncFactory
from kairon.events.definitions.base import EventsBase
from kairon.shared.account.processor import AccountProcessor
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import SyncType, SYNC_STATUS
from kairon.shared.catalog_sync.catalog_sync_log_processor import CatalogSyncLogProcessor


class CatalogSync(EventsBase):
"""
Validates and processes data from catalog (e.g., Petpooja) before importing it
to knowledge vault and meta
"""

def __init__(self, bot: Text, user: Text, provider: Text, **kwargs):
"""
Initialise event.
"""
sync_class = CatalogSyncFactory.get_instance(provider)
self.catalog_sync = sync_class(
bot=bot,
user=user,
provider=provider,
sync_type=kwargs.get("sync_type", SyncType.item_toggle),
token=kwargs.get("token", "")
)
self.catalog_sync.data = []

async def validate(self, **kwargs):
"""
Validates if an event is already running for that particular bot and
checks if the event trigger limit has been exceeded.
Then, preprocesses the received request
"""
request = kwargs.get("request_body")
self.catalog_sync.data = request
is_event_data = await self.catalog_sync.validate(request_body = request)
return is_event_data

def enqueue(self, **kwargs):
"""
Send event to event server
"""
try:
payload = {
'bot': self.catalog_sync.bot,
'user': self.catalog_sync.user,
'provider': self.catalog_sync.provider,
'sync_type': self.catalog_sync.sync_type,
'token': self.catalog_sync.token,
'data': self.catalog_sync.data
}
CatalogSyncLogProcessor.add_log(self.catalog_sync.bot, self.catalog_sync.user, self.catalog_sync.provider, self.catalog_sync.sync_type, sync_status=SYNC_STATUS.ENQUEUED.value)
Utility.request_event_server(EventClass.catalog_integration, payload)
except Exception as e:
CatalogSyncLogProcessor.delete_enqueued_event_log(self.catalog_sync.bot)
raise e
Comment on lines +57 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

add_log positional args look swapped – raw_payload will contain provider enum

CatalogSyncLogProcessor.add_log is invoked with positional arguments:

CatalogSyncLogProcessor.add_log(self.catalog_sync.bot,
                                self.catalog_sync.user,
                                self.catalog_sync.provider,
                                self.catalog_sync.sync_type,
                                sync_status=SYNC_STATUS.ENQUEUED.value)

If the function signature is add_log(bot, user, *, provider=None, sync_type=None, raw_payload=None, ...), the third
positional argument will map to raw_payload, not provider, leaving the real provider unset and the raw payload wrong.
Pass them as keyword arguments instead:

-            CatalogSyncLogProcessor.add_log(self.catalog_sync.bot, self.catalog_sync.user, self.catalog_sync.provider, self.catalog_sync.sync_type, sync_status=SYNC_STATUS.ENQUEUED.value)
+            CatalogSyncLogProcessor.add_log(
+                self.catalog_sync.bot,
+                self.catalog_sync.user,
+                provider=self.catalog_sync.provider,
+                sync_type=self.catalog_sync.sync_type,
+                sync_status=SYNC_STATUS.ENQUEUED.value,
+            )


async def execute(self, **kwargs):
"""
Execute the document content import event.
"""
AccountProcessor.load_system_properties()
self.catalog_sync.data = kwargs.get("data", [])
try:
initiate_import, stale_primary_keys= await self.catalog_sync.preprocess(request_body=self.catalog_sync.data)
await self.catalog_sync.execute(data=self.catalog_sync.data, initiate_import = initiate_import,stale_primary_keys = stale_primary_keys)
except Exception as e:
Comment on lines +63 to +72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Exceptions in execute are swallowed – sync status never updated

execute only logs the error and neither:

  1. Updates the log entry to FAILED, nor
  2. Re-raises the exception for upstream handling.

This leaves the sync perpetually in “PREPROCESSING” state.

Add a CatalogSyncLogProcessor.add_log(..., sync_status=SYNC_STATUS.FAILED.value, exception=str(e))
and re-raise to ensure observability.

logger.error(str(e))
4 changes: 3 additions & 1 deletion kairon/events/definitions/factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from kairon.events.definitions.agentic_flow import AgenticFlowEvent
from kairon.events.definitions.catalog_sync import CatalogSync
from kairon.events.definitions.content_importer import DocContentImporterEvent
from kairon.events.definitions.data_importer import TrainingDataImporterEvent
from kairon.events.definitions.faq_importer import FaqDataImporterEvent
Expand All @@ -24,7 +25,8 @@ class EventFactory:
EventClass.message_broadcast: MessageBroadcastEvent,
EventClass.content_importer: DocContentImporterEvent,
EventClass.mail_channel_read_mails: MailReadEvent,
EventClass.agentic_flow: AgenticFlowEvent
EventClass.agentic_flow: AgenticFlowEvent,
EventClass.catalog_integration: CatalogSync
}

@staticmethod
Expand Down
Loading
Loading