Skip to content

Callback Service Changes #1842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 112 additions & 61 deletions kairon/async_callback/main.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,62 @@
from contextlib import asynccontextmanager

from fastapi import FastAPI, Request
import asyncio
from mongoengine import connect, disconnect

from kairon.shared.account.processor import AccountProcessor
from kairon.shared.utils import Utility
from kairon.shared.account.processor import AccountProcessor

Utility.load_environment()

from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from kairon.async_callback.router import pyscript_callback

from blacksheep import Application, Request, Response
from blacksheep.server.responses import json as JSONResponse
from loguru import logger
from secure import StrictTransportSecurity, ReferrerPolicy, ContentSecurityPolicy, XContentTypeOptions, Server, \
CacheControl, Secure, PermissionsPolicy

from kairon.api.models import Response
from kairon.async_callback.router import pyscript_callback
from kairon.shared.otel import instrument_fastapi


async def startup(app: Application):
"""MongoDB is connected on the bot trainer startup"""
config: dict = Utility.mongoengine_connection(Utility.environment['database']["url"])
connect(**config)
print("Connecting to MongoDB...")
await asyncio.sleep(1)
await AccountProcessor.default_account_setup()
AccountProcessor.load_system_properties()
print("MongoDB connected.")


async def shutdown(app: Application):
"""Disconnect MongoDB on shutdown"""
disconnect()
print("Disconnecting from MongoDB...")
await asyncio.sleep(1)
print("MongoDB disconnected.")


app = Application(router=pyscript_callback.router)
app.on_start += startup
app.on_stop += shutdown

hsts = StrictTransportSecurity().include_subdomains().preload().max_age(31536000)
referrer = ReferrerPolicy().no_referrer()
csp = (
ContentSecurityPolicy().default_src("'self'")
.frame_ancestors("'self'")
.form_action("'self'")
.base_uri("'self'")
.connect_src("'self'" "api.spam.com")
.frame_src("'self'")
.img_src("'self'", "static.spam.com")
.frame_ancestors("'self'")
.form_action("'self'")
.base_uri("'self'")
.connect_src("'self' api.spam.com")
.frame_src("'self'")
.img_src("'self'", "static.spam.com")
)
cache_value = CacheControl().must_revalidate()
content = XContentTypeOptions()
server = Server().set("Secure")
permissions_value = (
PermissionsPolicy().accelerometer("").autoplay("").camera("").document_domain("").encrypted_media("")
.fullscreen("").geolocation("").gyroscope("").magnetometer("").microphone("").midi("").payment("")
.picture_in_picture("").sync_xhr("").usb("").geolocation("self", "'spam.com'").vibrate()
.fullscreen("").geolocation("").gyroscope("").magnetometer("").microphone("").midi("").payment("")
.picture_in_picture("").sync_xhr("").usb("").geolocation("self", "'spam.com'").vibrate()
)
secure_headers = Secure(
server=server,
Expand All @@ -48,61 +68,92 @@
content=content
)

@asynccontextmanager
async def lifespan(app: FastAPI):
""" MongoDB is connected on the bot trainer startup """
config: dict = Utility.mongoengine_connection(Utility.environment['database']["url"])
connect(**config)
await AccountProcessor.default_account_setup()
AccountProcessor.load_system_properties()
yield
disconnect()


app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
app.use_cors(
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["content-disposition"],
allow_credentials=True
)
app.add_middleware(GZipMiddleware)
instrument_fastapi(app)


@app.middleware("http")
async def add_secure_headers(request: Request, call_next):
"""Add security headers."""
response = await call_next(request)
secure_headers.framework.fastapi(response)
return response


@app.middleware("http")
async def catch_exceptions_middleware(request: Request, call_next):
async def catch_exceptions_middleware(request: Request, handler):
try:
return await call_next(request)
response = await handler(request)
return response
except Exception as exc:
logger.exception(exc)
error_response = {
"success": False,
"error_code": 500,
"message": str(exc),
}
return JSONResponse(error_response, status=500)
hsts_config = {
"include_subdomains": True,
"preload": True,
"max_age": 31536000,
}
referrer_config = {"policy": "no-referrer"}
csp_config = {
"default-src": ["'self'"],
"frame-ancestors": ["'self'"],
"form-action": ["'self'"],
"base-uri": ["'self'"],
"connect-src": ["'self'", "api.spam.com"],
"frame-src": ["'self'"],
"img-src": ["'self'", "static.spam.com"],
"script-src": ["'self'"],
"style-src": ["'self'"]
}
cache_control_config = {"must-revalidate": True}
permissions_config = {
"geolocation": ["self", "spam.com"],
}


def generate_hsts_header(config):
directives = [f"max-age={config['max_age']}"]
if config["include_subdomains"]:
directives.append("includeSubDomains")
if config["preload"]:
directives.append("preload")
return "; ".join(directives)


def generate_csp_header(config):
directives = []
for directive, sources in config.items():
if sources:
source_string = " ".join(sources)
directives.append(f"{directive} {source_string}")
return "; ".join(directives)


def generate_cache_control_header(config):
directives = []
if config["must-revalidate"]:
directives.append("must-revalidate")
return ", ".join(directives)


async def add_security_headers_middleware(request, handler):
response = await handler(request)
response.headers[b"Strict-Transport-Security"] = generate_hsts_header(hsts_config).encode("utf-8")
response.headers[b"Referrer-Policy"] = referrer_config["policy"].encode("utf-8")
response.headers[b"Content-Security-Policy"] = generate_csp_header(csp_config).encode("utf-8")
response.headers[b"Cache-Control"] = generate_cache_control_header(cache_control_config).encode("utf-8")
return response

return JSONResponse(
Response(
success=False, error_code=422, message=str(exc)
).dict()
)


@app.get("/", response_model=Response)
def index():
return {"message": "Running Async Callback Server"}

app.middlewares.append(add_security_headers_middleware)
app.middlewares.append(catch_exceptions_middleware)

@app.get("/healthcheck", response_model=Response)
def healthcheck():
return {"message": "health check ok"}

@app.router.get("/")
async def index():
return Response(message="Running BlackSheep Async Callback Server")

app.include_router(pyscript_callback.router)

@app.router.get("/healthcheck")
async def healthcheck():
return Response(message="Health check OK")
18 changes: 7 additions & 11 deletions kairon/async_callback/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
from loguru import logger
from kairon import Utility
from kairon.async_callback.channel_message_dispacher import ChannelMessageDispatcher
from kairon.async_callback.utils import CallbackUtility
from kairon.evaluator.processor import EvaluatorProcessor
from kairon.exceptions import AppException
from kairon.shared.callback.data_objects import CallbackData, CallbackConfig, CallbackLog, CallbackExecutionMode
from kairon.shared.cloud.utils import CloudUtility
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import TASK_TYPE
from kairon.shared.callback.data_objects import CallbackData, CallbackLog, CallbackExecutionMode

async_task_executor = ThreadPoolExecutor(max_workers=64)

Expand All @@ -26,16 +24,14 @@ def run_pyscript(script: str, predefined_objects: dict):
try:
if trigger_task:
logger.info("Triggering lambda for pyscript evaluation")
lambda_response = CloudUtility.trigger_lambda(EventClass.pyscript_evaluator, {
response = CallbackUtility.pyscript_handler({
'source_code': script,
'predefined_objects': predefined_objects
}, task_type=TASK_TYPE.CALLBACK.value)
if CloudUtility.lambda_execution_failed(lambda_response):
err = lambda_response['Payload'].get('body') or lambda_response
}, None)
if response["statusCode"] != 200:
err = response.get('body') or response
raise AppException(f"{err}")
if err := lambda_response["Payload"].get('errorMessage'):
raise AppException(f"{err}")
result = lambda_response["Payload"].get('body')
result = response.get('body')
return result
else:
logger.info("Triggering local_evaluator for pyscript evaluation")
Expand Down
Loading
Loading