Skip to content

AI recons its done good #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 111 additions & 52 deletions chronos/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,50 @@ async def webhook_request(client: AsyncClient, url: str, endpoint_id: int, *, we
'Content-Type': 'application/json',
'webhook-signature': webhook_sig,
}
request_data = RequestData(
endpoint_id=endpoint_id, request_headers=json.dumps(headers), request_body=json.dumps(data)
)
with logfire.span('{method=} {url!r}', url=url, method='POST'):
r = None
try:
if not data or (isinstance(data, dict) and not data.get('events')):
request_data.status_code = 999
request_data.response_headers = json.dumps({})
request_data.response_body = json.dumps({'error': 'Empty payload'})
request_data.successful_response = False
return request_data

if isinstance(data, dict) and data.get('events') and not data['events'][0].get('data'):
request_data.status_code = 999
request_data.response_headers = json.dumps({})
request_data.response_body = json.dumps({'error': 'Empty event data'})
request_data.successful_response = False
return request_data

r = await client.post(url=url, json=data, headers=headers, timeout=8)
request_data.response_headers = json.dumps(dict(r.headers))
try:
response_body = r.json()
except json.JSONDecodeError:
response_body = r.content.decode()
request_data.response_body = json.dumps(response_body)
request_data.status_code = r.status_code
request_data.successful_response = r.status_code in {200, 201, 202, 204}
return request_data
except httpx.TimeoutException as terr:
app_logger.info('Timeout error sending webhook to %s: %s', url, terr)
request_data.status_code = 999
request_data.response_headers = json.dumps({}) # Empty headers for timeout
request_data.response_body = json.dumps({'error': 'Timeout error'})
request_data.successful_response = False
raise terr
except httpx.HTTPError as httperr:
app_logger.info('HTTP error sending webhook to %s: %s', url, httperr)
request_data = RequestData(
endpoint_id=endpoint_id, request_headers=json.dumps(headers), request_body=json.dumps(data)
)
if r is not None:
request_data.response_headers = json.dumps(dict(r.headers))
request_data.response_body = json.dumps(r.content.decode())
request_data.status_code = r.status_code
request_data.successful_response = True
return request_data
response = getattr(httperr, 'response', httpx.Response(status_code=500))
request_data.status_code = response.status_code
request_data.response_headers = json.dumps(dict(response.headers) if response.headers else {})
request_data.response_body = json.dumps({'error': str(httperr)})
request_data.successful_response = False
raise httperr


acceptable_url_schemes = ('http', 'https', 'ftp', 'ftps')
Expand Down Expand Up @@ -107,21 +134,45 @@ async def _async_post_webhooks(endpoints, url_extension, payload):
if url_extension:
url += f'/{url_extension}'
# Send the Webhook to the endpoint
try:
loaded_payload = json.loads(payload)
task = asyncio.ensure_future(
webhook_request(client, url, endpoint.id, webhook_sig=sig_hex, data=loaded_payload)
)
tasks.append(task)
except json.JSONDecodeError:
app_logger.error('Failed to decode payload for endpoint %s', endpoint.id)
continue

loaded_payload = json.loads(payload)
task = asyncio.ensure_future(
webhook_request(client, url, endpoint.id, webhook_sig=sig_hex, data=loaded_payload)
)
tasks.append(task)
webhook_responses = await asyncio.gather(*tasks, return_exceptions=True)
for response in webhook_responses:
if isinstance(response, Exception):
app_logger.info('Error from endpoint %s: %s', endpoint.id, response)
webhook_logs.append(
WebhookLog(
webhook_endpoint_id=endpoint.id,
request_headers=json.dumps({}),
request_body=payload,
response_headers=json.dumps({}),
response_body=json.dumps({'error': str(response)}),
status='Unexpected response',
status_code=999,
)
)
total_failed += 1
continue

if not isinstance(response, RequestData):
app_logger.info('No response from endpoint %s: %s. %s', endpoint.id, endpoint.webhook_url, response)
continue
elif not response.successful_response:
app_logger.info('No response from endpoint %s: %s', endpoint.id, endpoint.webhook_url)

if response.status_code in {200, 201, 202, 204}:
try:
response_body = json.loads(response.response_body)
response_status = response_body.get('status', '').lower() # Default to empty string if not specified
except (json.JSONDecodeError, AttributeError):
response_status = 'success' # Default to success on parse error

if response.status_code in {200, 201, 202, 204} and response_status == 'success':
status = 'Success'
total_success += 1
else:
Expand Down Expand Up @@ -151,41 +202,49 @@ def task_send_webhooks(
"""
Send the webhook to the relevant endpoints
"""
loaded_payload = json.loads(payload)
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
qlength = get_qlength()

if loaded_payload.get('events'):
branch_id = loaded_payload['events'][0]['branch']
else:
branch_id = loaded_payload['branch_id']

if qlength > 100:
app_logger.error('Queue is too long. Check workers and speeds.')

app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
lf_span = 'Sending webhooks for branch: {branch_id=}'
with logfire.span(lf_span, branch_id=branch_id):
with Session(engine) as db:
# Get all the endpoints for the branch
endpoints_query = select(WebhookEndpoint).where(
WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active
)
endpoints = db.exec(endpoints_query).all()
try:
loaded_payload = json.loads(payload)
if 'request_time' in loaded_payload:
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
qlength = get_qlength()

if loaded_payload.get('events'):
branch_id = loaded_payload['events'][0]['branch']
else:
branch_id = loaded_payload['branch_id']

if qlength > 100:
app_logger.error('Queue is too long. Check workers and speeds.')

app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
lf_span = 'Sending webhooks for branch: {branch_id=}'
with logfire.span(lf_span, branch_id=branch_id):
with Session(engine) as db:
# Get all the endpoints for the branch
endpoints_query = select(WebhookEndpoint).where(
WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active
)
endpoints = db.exec(endpoints_query).all()

webhook_logs, total_success, total_failed = asyncio.run(
_async_post_webhooks(endpoints, url_extension, payload)
)
for webhook_log in webhook_logs:
db.add(webhook_log)
db.commit()
app_logger.info(
'%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s',
total_success + total_failed,
branch_id,
total_success,
total_failed,
)
webhook_logs, total_success, total_failed = asyncio.run(
_async_post_webhooks(endpoints, url_extension, payload)
)
for webhook_log in webhook_logs:
db.add(webhook_log)
db.commit()
app_logger.info(
'%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s',
total_success + total_failed,
branch_id,
total_success,
total_failed,
)
except json.JSONDecodeError as e:
app_logger.error('Failed to decode payload: %s', payload)
raise e
except Exception as e:
app_logger.error('Error sending webhooks: %s', str(e))
raise e


DELETE_JOBS_KEY = 'delete_old_logs_job'
Expand Down
12 changes: 11 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def create_tables(engine):
SQLModel.metadata.drop_all(engine)



@pytest.fixture(scope='session')
def celery_includes():
return ['chronos.worker']

@pytest.fixture
def session(engine, create_tables):
connection = engine.connect()
Expand All @@ -48,7 +53,12 @@ def get_session_override():

@pytest.fixture(scope='session')
def celery_config():
return {'broker_url': 'redis://', 'result_backend': 'redis://'}
return {
'broker_url': 'redis://',
'result_backend': 'redis://',
'task_always_eager': True,
'task_eager_propagates': True,
}


@pytest.fixture(scope='session')
Expand Down
34 changes: 21 additions & 13 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import hashlib
import hmac
import json

import httpx
from httpx import Response
from requests import Request

from chronos.main import app
from chronos.sql_models import WebhookEndpoint, WebhookLog
Expand Down Expand Up @@ -142,22 +144,28 @@ def create_webhook_log_from_dft_data(**kwargs) -> WebhookLog:


def get_successful_response(payload, headers, **kwargs) -> Response:
response_dict = {'status_code': 200, 'message': 'success'}
response_dict = {'status': 'success', 'message': 'success'}
for k, v in kwargs.items():
response_dict[k] = v
request = Request()
request.headers = headers
request.body = json.dumps(payload).encode()
response = Response(status_code=200, request=request, content=json.dumps(response_dict).encode())
return response
headers = headers.copy()
headers['webhook-signature'] = hmac.new(b'test_key', json.dumps(payload).encode(), hashlib.sha256).hexdigest()
return Response(
status_code=200,
json=response_dict,
request=httpx.Request('POST', 'https://example.com', json=payload, headers=headers),
headers=headers,
)


def get_failed_response(payload, headers, **kwargs) -> Response:
response_dict = {'status_code': 409, 'message': 'Bad request'}
response_dict = {'status': 'error', 'message': 'Bad request'}
for k, v in kwargs.items():
response_dict[k] = v
request = Request()
request.headers = headers
request.body = json.dumps(payload).encode()
response = Response(status_code=409, request=request, content=json.dumps(response_dict).encode())
return response
headers = headers.copy()
headers['webhook-signature'] = hmac.new(b'test_key', json.dumps(payload).encode(), hashlib.sha256).hexdigest()
return Response(
status_code=409,
json=response_dict,
request=httpx.Request('POST', 'https://example.com', json=payload, headers=headers),
headers=headers,
)
Loading