Skip to content

Alpha/task19 - Done Razponza Integration #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions app/api/endpoints/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Annotated, Any

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status
from pymongo.database import Database

from app.api.custom import RouteErrorHandler
Expand All @@ -9,7 +9,7 @@
from app.schemas.common.api_response import ApiGeneralResponse
from app.schemas.common.object_id import PyObjectId
from app.schemas.user import UserRead
from app.services.task import TaskService
from app.services.scheduler import scheduled_task_handler

router = APIRouter(route_class=RouteErrorHandler, tags=["Task"])

Expand All @@ -18,6 +18,7 @@
def run_task(
db: Annotated[Database[dict[str, Any]], Depends(get_db)],
current_user: Annotated[UserRead, Depends(get_user)],
background_tasks: BackgroundTasks,
task_id: str,
) -> ApiGeneralResponse:
task_config_crud = TaskConfigCRUD(db=db)
Expand All @@ -27,6 +28,6 @@ def run_task(
if config.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")

task_service = TaskService(config=config)
task_service.run()
return ApiGeneralResponse(message="Task triggered successfully")
background_tasks.add_task(scheduled_task_handler, current_user.id, PyObjectId(task_id))

return ApiGeneralResponse(message="Task started in background")
18 changes: 18 additions & 0 deletions app/schemas/configs/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,30 @@
COLLECTION_NAME = "task_configs"


class TaskType(StrEnum):
NORMAL = "normal"
REZPONZA = "rezponza"


class TaskConfig(BaseModel):
user_id: PyObjectId | None
task_name: str
task_type: TaskType = TaskType.NORMAL
description: str
fetch_config: FetchConfig
enigx_config: EnigxConfig
interval_secs: int | None = None
task_args: dict[str, str] = {}

class FieldName(StrEnum):
user_id = "user_id"
task_name = "task_name"
task_type = "task_type"
description = "description"
fetch_config = "fetch_config"
enigx_config = "enigx_config"
interval_secs = "interval_secs"
task_args = "task_args"

@field_validator(FieldName.interval_secs.value, mode="before")
@classmethod
Expand All @@ -46,3 +55,12 @@ def update_with_apjob(self, apjob: Job | None) -> None:
if apjob is not None:
self.is_scheduled = True
self.next_run_time = apjob.next_run_time


class PutResult(BaseModel):
put_count: int
duplicated_count: int


class RezponzaTaskArgs(StrEnum):
KBID = "kbid"
36 changes: 29 additions & 7 deletions app/services/enigx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any

import requests
from loguru import logger
from fastapi import HTTPException, status
from requests import Response

from app.core.config.fetch import config as config_fetch
Expand All @@ -24,6 +24,23 @@ def get_def_headers(self) -> dict[str, str]:
"Authorization": self.bearer_token,
}

def fetch_knowledge(self) -> list[dict[str, Any]]:
resp = requests.get(
url=f"https://api.enigx.com/v1/projects/{self.project_id}/knowledge",
headers={
**self.get_def_headers(),
"Content-Type": "application/json",
},
timeout=config_fetch.REQUEST_TIMEOUT,
)
resp_json = resp.json()
if not isinstance(resp_json, list):
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch knowledge",
)
return resp_json

def put_to_knowledge(self, data: FetchData) -> None:
if data.type is FetchDataType.FILE:
if isinstance(data.data, bytes):
Expand All @@ -32,11 +49,16 @@ def put_to_knowledge(self, data: FetchData) -> None:
msg = f"Invalid file data type: {type(data.data)}"
raise TypeError(msg)
else:
resp = self.put_text(json.dumps(data.data, default=str))
resp = self.put_text(
data.name,
json.dumps(
data.data,
ensure_ascii=False,
default=str,
),
)

if resp.status_code == HTTPStatus.OK:
logger.debug(json.dumps(resp.json(), indent=2))
else:
if resp.status_code != HTTPStatus.OK:
msg = f"Enigx error: {resp.status_code}: {resp.text}"
raise ValueError(msg)

Expand All @@ -60,15 +82,15 @@ def put_file(self, filename: str | None, file_bytes: bytes, options: dict[str, A
timeout=config_fetch.REQUEST_TIMEOUT,
)

def put_text(self, text: str) -> Response:
def put_text(self, name: str | None, text: str) -> Response:
return requests.post(
url=f"https://api.enigx.com/v1/projects/{self.project_id}/knowledge/text",
headers={
**self.get_def_headers(),
"Content-Type": "application/json",
},
json={
"name": f"text_{datetime.now(tz=UTC).isoformat()}",
"name": name or f"text_{datetime.now(tz=UTC).isoformat()}",
"text": text,
},
timeout=config_fetch.REQUEST_TIMEOUT,
Expand Down
6 changes: 3 additions & 3 deletions app/services/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class FetchService:
def __init__(self, config: FetchConfig) -> None:
self.config = config

def fetch(self) -> FetchData:
def fetch(self, url: str | None = None) -> FetchData:
auth_header: dict[str, str] = {}
auth_query_data: dict[str, str] = {}

Expand All @@ -21,14 +21,14 @@ def fetch(self) -> FetchData:

if self.config.method is FetchMethod.GET:
resp = requests.get(
url=self.config.url,
url=url or self.config.url,
headers=auth_header,
params=auth_query_data,
timeout=config_fetch.REQUEST_TIMEOUT,
)
elif self.config.method is FetchMethod.POST:
resp = requests.post(
url=self.config.url,
url=url or self.config.url,
headers=auth_header,
params=auth_query_data,
timeout=config_fetch.REQUEST_TIMEOUT,
Expand Down
3 changes: 1 addition & 2 deletions app/services/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ def scheduled_task_handler(user_id: PyObjectId, task_id: PyObjectId) -> None:
task_config = task_config_list[0]

logger.info(
"Run scheduled task...\n" # noqa: ISC003
"Run background task...\n" # noqa: ISC003
+ f" task_name: {task_config.task_name}\n"
+ f" description: {task_config.description}\n"
+ f" interval: {task_config.interval_secs}"
)

task_service = TaskService(config=task_config)
Expand Down
106 changes: 104 additions & 2 deletions app/services/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from bs4 import BeautifulSoup
from fastapi import HTTPException
from loguru import logger

from app.schemas.configs.task import TaskConfig
from app.schemas.configs.fetch import FetchData, FetchDataType
from app.schemas.configs.task import PutResult, RezponzaTaskArgs, TaskConfig, TaskType
from app.services.enigx import EnigxService
from app.services.fetcher import FetchService

Expand All @@ -15,7 +18,17 @@ def __init__(self, config: TaskConfig) -> None:
config.enigx_config.bearer_token,
)

def run(self) -> None:
def run(self) -> PutResult | None:
if self.config.task_type is TaskType.NORMAL:
self.run_as_normal()

elif self.config.task_type is TaskType.REZPONZA:
return self.run_as_rezponza()

msg = f"Invalid task type: {self.config.task_type}"
raise ValueError(msg)

def run_as_normal(self) -> None:
try:
fetch_data = self.fetch_service.fetch()
except: # noqa: E722
Expand All @@ -25,3 +38,92 @@ def run(self) -> None:
self.enigx_service.put_to_knowledge(fetch_data)
except: # noqa: E722
raise HTTPException(status_code=500, detail="Failed to put to knowledge") # noqa: B904

def run_as_rezponza(self) -> PutResult:
kbid = self.config.task_args.get(RezponzaTaskArgs.KBID)
base_url = self.config.fetch_config.url.strip("/")

# Fetch Article List
try:
logger.debug("fetch article list...")
article_list_url = f"{base_url}/{kbid}/articles"
article_info_data = self.fetch_service.fetch(url=article_list_url)

logger.debug("fetch article list from enigx...")
existing_articles = self.enigx_service.fetch_knowledge()
existing_names = [data.get("name") for data in existing_articles]

logger.debug("removing duplications...")
duplicated_count = 0
article_info_list = []
if isinstance(article_info_data.data, list):
for article_info in article_info_data.data:
item_name = f"{kbid}_{article_info.get('articleId')}_{article_info.get('title')}"
if item_name in existing_names:
logger.debug(f"duplicated article: {item_name}")
duplicated_count += 1
continue
article_info_list.append(article_info)
except: # noqa: E722
raise HTTPException(status_code=500, detail="Failed to fetch article list") # noqa: B904

# Fetch Articles
logger.debug("fetch articles...")
article_data_list: list[FetchData] = []
try:
for article_info in article_info_list:
try:
article_id = article_info.get("articleId")
logger.debug(f"article_id: {article_id}...")

article_url = f"{base_url}/{kbid}/articles/{article_id}"

article_data = self.fetch_service.fetch(url=article_url)
article = article_data.data
if isinstance(article, dict):
sections = article.get("sections")
if isinstance(sections, list):
for section in sections:
# Cleanup HTML data
html_content = section.get("content")

soup = BeautifulSoup(html_content, "html.parser")
for tag in soup(["script", "style"]):
tag.decompose() # Removes the tag completely
cleaned_content = soup.get_text(separator="\n", strip=True)

section["content"] = cleaned_content

item_name = (
f"{article.get('knowledgebaseId')}_{article.get('articleId')}_{article.get('title')}"
)
article_data_list.append(
FetchData(
name=item_name,
type=FetchDataType.JSON,
data=article,
)
)
except: # noqa: E722
logger.error(f"failed to fetch article: {article_id}")

except: # noqa: E722
raise HTTPException(status_code=500, detail="Failed to fetch articles") # noqa: B904

# Put to knowledge
logger.debug("put articles to knowledge...")
put_count = 0
for article_data in article_data_list:
try:
logger.debug(f"item_name: {article_data.name}")
self.enigx_service.put_to_knowledge(article_data)
put_count += 1
except: # noqa: E722
logger.error(f"failed to put to knowledge: {article_data.name}")

logger.debug("finished task")

return PutResult(
put_count=put_count,
duplicated_count=duplicated_count,
)
76 changes: 76 additions & 0 deletions experimental/mock_razponza_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from datetime import datetime
from typing import List

from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn

app = FastAPI()


class ArticleSummary(BaseModel):
articleId: int
title: str
summary: str
lastModified: datetime


class Section(BaseModel):
sectionTitle: str
content: str


class ArticleDetail(ArticleSummary):
knowledgebaseId: int
sections: List[Section]


# Mock data
articles = [
{
"articleId": 527,
"title": "Erhvervsaftale",
"summary": "B2B oprettelse, erhvervskunder - kunden opretter sig på thansen.dk",
"lastModified": "2024-11-26T15:09:18.297",
},
{
"articleId": 530,
"title": "Oprettelse af klubkort til erhvervskunder",
"summary": "Du opretter og ser eksisterende klubkort ved at trykke F10 ⇒ H på kundenummeret",
"lastModified": "2024-08-19T13:45:45.413",
},
]

article_details = {
527: {
"articleId": 527,
"knowledgebaseId": 25,
"title": "Erhvervsaftale",
"summary": "B2B oprettelse, erhvervskunder - kunden opretter sig på thansen.dk",
"lastModified": "2024-11-26T15:09:18.297",
"sections": [
{
"sectionTitle": "Information",
"content": "<h4><strong>Erhvervsaftale</strong></h4>\n\n<p>Virksomheder og offentlige instanser, har mulighed for at blive registreret som erhvervskunde.&nbsp;</p>\n\n<p>Hvis en kunde &oslash;nsker at f&aring; en erhvervsaftale, kan kunden g&oslash;re det via <a href=\"https://www.thansen.dk/kundeservice/erhvervsaftale/n-236577680\" target=\"_blank\">hjemmesiden</a>.&nbsp;</p>\n\n<p>Med en erhvervsaftale f&aring;r kunden;</p>\n\n<ul>\n\t<li>Personligt B2B-kort til brug for shopping i alle vores thansen butikker</li>\n\t<li>Dedikeret r&aring;dgivning via vores B2B afdeling</li>\n\t<li>Intelligent og brugervenlig selvbetjeningsl&oslash;sning p&aring; thansen.dk</li>\n\t<li>Se priser og f&aring; overblik over ordrehistorik samt ordrestatus</li>\n\t<li>Prismatch - Se betingelserne her (Link til betingelser)</li>\n\t<li>Opn&aring; rabat ved k&oslash;b for mere end 25.000 kr. &aring;rligt</li>\n\t<li>Mulighed for fakturabetaling (Foruds&aelig;tter kreditgodkendelse)</li>\n\t<li>Bestil online 24/7 - Ved bestilling inden kl. 18:00 leverer vi varen dagen efter</li>\n\t<li>Afhentning, returnering og ombytning alle ugens 7 dage</li>\n</ul>\n\n<p><strong>Mistet erhvervskort</strong></p>\n\n<p>Hvis en erhvervskunde har mistet sit erhvervskort, skal du sende en mail til erhverv@thansen.dk.&nbsp;</p>\n\n<p>I mailen skal du huske at vedh&aelig;ftet kundenummeret.&nbsp;</p>\n\n<p>Beskriv at kunden har mistet sit kort, og derfor er n&oslash;dt til at f&aring; et nyt.</p>\n",
},
{"sectionTitle": "Arbejdsgang", "content": ""},
],
}
}


@app.get("/kb/{kb_id}/articles", response_model=List[ArticleSummary])
async def get_articles(kb_id: int):
return articles


@app.get("/kb/{kb_id}/articles/{article_id}", response_model=ArticleDetail)
async def get_article_detail(kb_id: int, article_id: int):
article = article_details.get(article_id)
if not article:
return {"error": "Article not found"}
return article


if __name__ == "__main__":
uvicorn.run(app=app, port=8001)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.ruff]
target-version = "py312"
line-length = 120
exclude = [".venv"]
exclude = [".venv", "experimental"]

[tool.ruff.lint]
select = ["ALL"]
Expand Down
Loading