-
Notifications
You must be signed in to change notification settings - Fork 84
Developed an end-to-end catalog ingestion flow from Petpooja(POS) to Meta #1932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
893ee25
ebe1dd1
e43c096
780a404
a72c552
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,7 +1,7 @@ | ||||||||||
import os | ||||||||||
from typing import List | ||||||||||
from typing import List, Text | ||||||||||
|
||||||||||
from fastapi import UploadFile, File, Security, APIRouter, Query, HTTPException | ||||||||||
from fastapi import UploadFile, File, Security, APIRouter, Query, HTTPException, Path | ||||||||||
from starlette.requests import Request | ||||||||||
from starlette.responses import FileResponse | ||||||||||
|
||||||||||
|
@@ -13,8 +13,9 @@ | |||||||||
from kairon.shared.cognition.data_objects import CognitionSchema | ||||||||||
from kairon.shared.cognition.processor import CognitionDataProcessor | ||||||||||
from kairon.shared.concurrency.actors.factory import ActorFactory | ||||||||||
from kairon.shared.constants import ActorType | ||||||||||
from kairon.shared.constants import ActorType, CatalogSyncClass | ||||||||||
from kairon.shared.constants import DESIGNER_ACCESS | ||||||||||
from kairon.shared.data.data_models import POSIntegrationRequest | ||||||||||
from kairon.shared.data.data_models import BulkDeleteRequest | ||||||||||
from kairon.shared.data.processor import MongoProcessor | ||||||||||
from kairon.shared.models import User | ||||||||||
|
@@ -342,7 +343,7 @@ async def download_error_csv( | |||||||||
async def knowledge_vault_sync( | ||||||||||
primary_key_col: str, | ||||||||||
collection_name: str, | ||||||||||
event_type: str, | ||||||||||
sync_type: str, | ||||||||||
data: List[dict], | ||||||||||
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS), | ||||||||||
): | ||||||||||
|
@@ -351,7 +352,7 @@ async def knowledge_vault_sync( | |||||||||
""" | ||||||||||
data = [{key.lower(): value for key, value in row.items()} for row in data] | ||||||||||
|
||||||||||
error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data, current_user.get_bot()) | ||||||||||
error_summary = cognition_processor.validate_data(primary_key_col.lower(), collection_name.lower(), sync_type.lower(), data, current_user.get_bot()) | ||||||||||
|
||||||||||
if error_summary: | ||||||||||
return Response( | ||||||||||
|
@@ -361,11 +362,34 @@ async def knowledge_vault_sync( | |||||||||
error_code=400 | ||||||||||
) | ||||||||||
|
||||||||||
await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), event_type.lower(), data, | ||||||||||
await cognition_processor.upsert_data(primary_key_col.lower(), collection_name.lower(), sync_type.lower(), data, | ||||||||||
current_user.get_bot(), current_user.get_user()) | ||||||||||
|
||||||||||
return Response( | ||||||||||
success=True, | ||||||||||
message="Processing completed successfully", | ||||||||||
data=None | ||||||||||
) | ||||||||||
) | ||||||||||
|
||||||||||
|
||||||||||
@router.post("/integrations/add", response_model=Response) | ||||||||||
async def add_pos_integration_config( | ||||||||||
request_data: POSIntegrationRequest, | ||||||||||
sync_type: str, | ||||||||||
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS), | ||||||||||
): | ||||||||||
""" | ||||||||||
Add data integration config | ||||||||||
""" | ||||||||||
CognitionDataProcessor.load_catalog_provider_mappings() | ||||||||||
|
||||||||||
if request_data.provider not in CatalogSyncClass.__members__.values(): | ||||||||||
raise AppException("Invalid Provider") | ||||||||||
Comment on lines
+386
to
+388
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect Enum membership check – always evaluates to False
-if request_data.provider not in CatalogSyncClass.__members__.values():
+if request_data.provider not in CatalogSyncClass._value2member_map_:
raise AppException("Invalid Provider") Without this fix every valid provider string will be rejected. 📝 Committable suggestion
Suggested change
|
||||||||||
|
||||||||||
CognitionDataProcessor.add_bot_sync_config(request_data, current_user.get_bot(), current_user.get_user()) | ||||||||||
|
||||||||||
integration_endpoint = cognition_processor.save_pos_integration_config( | ||||||||||
request_data.dict(), current_user.get_bot(), current_user.get_user(), sync_type | ||||||||||
) | ||||||||||
|
||||||||||
return Response(message='POS Integration Complete', data=integration_endpoint) |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,85 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from typing import Text | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from fastapi import Security, APIRouter, Path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from starlette.requests import Request | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.api.models import Response | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.events.definitions.catalog_sync import CatalogSync | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.exceptions import AppException | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.auth import Authentication | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.catalog_sync.data_objects import CatalogSyncLogs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.cognition.processor import CognitionDataProcessor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.constants import CatalogProvider | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.constants import DESIGNER_ACCESS | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.models import User | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
from kairon.shared.utils import MailUtility | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
router = APIRouter() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cognition_processor = CognitionDataProcessor() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@router.post("/{provider}/{sync_type}/{bot}/{token}", response_model=Response) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def sync_data( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
request: Request, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
provider: CatalogProvider = Path(description="Catalog provider name", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
examples=[CatalogProvider.PETPOOJA.value]), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bot: Text = Path(description="Bot id"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sync_type: Text = Path(description="Sync Type"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
token: str = Path(description="JWT token for authentication"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+22
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect FastAPI’s - provider: CatalogProvider = Path(description="Catalog provider name",
- examples=[CatalogProvider.PETPOOJA.value]),
+ provider: CatalogProvider = Path(
+ description="Catalog provider name",
+ example=CatalogProvider.PETPOOJA.value,
+ ), Repeat for the Also applies to: 55-61 🧰 Tools🪛 Ruff (0.8.2)23-24: Do not perform function call (B008) 25-25: Do not perform function call (B008) 26-26: Do not perform function call (B008) 27-27: Do not perform function call (B008) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Handles incoming data from catalog_sync (e.g., Petpooja) for processing, validation, and eventual storage. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
request_body = await request.json() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event = CatalogSync( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bot=bot, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
user=current_user.get_user(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
provider=provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sync_type=sync_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
token=token | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
is_event_data = await event.validate(request_body=request_body) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if is_event_data is True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.enqueue() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return {"message": "Sync in progress! Check logs."} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise AppException(is_event_data) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+33
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Harden request-body parsing and error propagation
- request_body = await request.json()
+ try:
+ request_body = await request.json()
+ except ValueError as e:
+ raise AppException(f"Malformed JSON payload: {e}") from e 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@router.post("/{provider}/{sync_type}/{bot}/{token}/{execution_id}", response_model=Response) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async def rerun_sync( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
provider: CatalogProvider = Path(description="Catalog provider name", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
examples=[CatalogProvider.PETPOOJA.value]), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bot: Text = Path(description="Bot id"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sync_type: Text = Path(description="Sync Type"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
token: str = Path(description="JWT token for authentication"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
execution_id: str = Path(description="Execution id"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Handles incoming data from catalog_sync (e.g., Petpooja) for processing, validation, and eventual storage. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sync_log_entry = CatalogSyncLogs.objects(execution_id=execution_id).first() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if not sync_log_entry: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise AppException(f"Sync log with execution ID {execution_id} not found.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
request_body = sync_log_entry.raw_payload | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event = CatalogSync( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bot=bot, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
user=current_user.get_user(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
provider=provider, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
sync_type=sync_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
token=token | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
is_event_data = await event.validate(request_body=request_body) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if is_event_data is True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
event.enqueue() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return {"message": "Sync in progress! Check logs."} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
raise AppException(is_event_data) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from abc import abstractmethod | ||
|
||
|
||
class CatalogSyncBase: | ||
|
||
"""Base class to create events""" | ||
|
||
@abstractmethod | ||
def validate(self): | ||
raise NotImplementedError("Provider not implemented") | ||
|
||
@abstractmethod | ||
def preprocess(self): | ||
raise NotImplementedError("Provider not implemented") | ||
|
||
@abstractmethod | ||
def execute(self, **kwargs): | ||
raise NotImplementedError("Provider not implemented") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from kairon.events.definitions.petpooja_sync import PetpoojaSync | ||
from kairon.exceptions import AppException | ||
from kairon.shared.constants import CatalogSyncClass | ||
|
||
|
||
class CatalogSyncFactory: | ||
|
||
__provider_implementations = { | ||
CatalogSyncClass.petpooja: PetpoojaSync, | ||
} | ||
|
||
@staticmethod | ||
def get_instance(provider: str): | ||
""" | ||
Factory to retrieve catalog provider implementation for execution. | ||
:param provider: catalog provider name (e.g., "petpooja") | ||
:return: Corresponding Sync class | ||
""" | ||
try: | ||
provider_enum = CatalogSyncClass(provider.lower()) | ||
except ValueError: | ||
valid_syncs = [sync.value for sync in CatalogSyncClass] | ||
raise AppException(f"'{provider}' is not a valid catalog sync provider. Accepted types: {valid_syncs}") | ||
|
||
sync_class = CatalogSyncFactory.__provider_implementations.get(provider_enum) | ||
if not sync_class: | ||
raise AppException(f"No implementation found for provider '{provider}'.") | ||
|
||
return sync_class |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from typing import Text | ||
from kairon import Utility | ||
from loguru import logger | ||
|
||
from kairon.catalog_sync.definitions.factory import CatalogSyncFactory | ||
from kairon.events.definitions.base import EventsBase | ||
from kairon.shared.account.processor import AccountProcessor | ||
from kairon.shared.constants import EventClass | ||
from kairon.shared.data.constant import SyncType, SYNC_STATUS | ||
from kairon.shared.catalog_sync.catalog_sync_log_processor import CatalogSyncLogProcessor | ||
|
||
|
||
class CatalogSync(EventsBase): | ||
""" | ||
Validates and processes data from catalog (e.g., Petpooja) before importing it | ||
to knowledge vault and meta | ||
""" | ||
|
||
def __init__(self, bot: Text, user: Text, provider: Text, **kwargs): | ||
""" | ||
Initialise event. | ||
""" | ||
sync_class = CatalogSyncFactory.get_instance(provider) | ||
self.catalog_sync = sync_class( | ||
bot=bot, | ||
user=user, | ||
provider=provider, | ||
sync_type=kwargs.get("sync_type", SyncType.item_toggle), | ||
token=kwargs.get("token", "") | ||
) | ||
self.catalog_sync.data = [] | ||
|
||
async def validate(self, **kwargs): | ||
""" | ||
Validates if an event is already running for that particular bot and | ||
checks if the event trigger limit has been exceeded. | ||
Then, preprocesses the received request | ||
""" | ||
request = kwargs.get("request_body") | ||
self.catalog_sync.data = request | ||
is_event_data = await self.catalog_sync.validate(request_body = request) | ||
return is_event_data | ||
|
||
def enqueue(self, **kwargs): | ||
""" | ||
Send event to event server | ||
""" | ||
try: | ||
payload = { | ||
'bot': self.catalog_sync.bot, | ||
'user': self.catalog_sync.user, | ||
'provider': self.catalog_sync.provider, | ||
'sync_type': self.catalog_sync.sync_type, | ||
'token': self.catalog_sync.token, | ||
'data': self.catalog_sync.data | ||
} | ||
CatalogSyncLogProcessor.add_log(self.catalog_sync.bot, self.catalog_sync.user, self.catalog_sync.provider, self.catalog_sync.sync_type, sync_status=SYNC_STATUS.ENQUEUED.value) | ||
Utility.request_event_server(EventClass.catalog_integration, payload) | ||
except Exception as e: | ||
CatalogSyncLogProcessor.delete_enqueued_event_log(self.catalog_sync.bot) | ||
raise e | ||
Comment on lines
+57
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
CatalogSyncLogProcessor.add_log(self.catalog_sync.bot,
self.catalog_sync.user,
self.catalog_sync.provider,
self.catalog_sync.sync_type,
sync_status=SYNC_STATUS.ENQUEUED.value) If the function signature is - CatalogSyncLogProcessor.add_log(self.catalog_sync.bot, self.catalog_sync.user, self.catalog_sync.provider, self.catalog_sync.sync_type, sync_status=SYNC_STATUS.ENQUEUED.value)
+ CatalogSyncLogProcessor.add_log(
+ self.catalog_sync.bot,
+ self.catalog_sync.user,
+ provider=self.catalog_sync.provider,
+ sync_type=self.catalog_sync.sync_type,
+ sync_status=SYNC_STATUS.ENQUEUED.value,
+ ) |
||
|
||
async def execute(self, **kwargs): | ||
""" | ||
Execute the document content import event. | ||
""" | ||
AccountProcessor.load_system_properties() | ||
self.catalog_sync.data = kwargs.get("data", []) | ||
try: | ||
initiate_import, stale_primary_keys= await self.catalog_sync.preprocess(request_body=self.catalog_sync.data) | ||
await self.catalog_sync.execute(data=self.catalog_sync.data, initiate_import = initiate_import,stale_primary_keys = stale_primary_keys) | ||
except Exception as e: | ||
Comment on lines
+63
to
+72
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Exceptions in
This leaves the sync perpetually in “PREPROCESSING” state. Add a |
||
logger.error(str(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider error handling for catalog provider mappings.
The method
load_catalog_provider_mappings()
is called without error handling. If this fails, it would result in an unhandled exception.📝 Committable suggestion