Skip to content

fix: run notifications in background #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fastapi_jsonrpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,15 @@ async def handle_req(
RequestValidationError(_normalize_errors(solved_dependency.errors))
)

# We MUST NOT return response for Notification
# https://www.jsonrpc.org/specification#notification
# Since we do not need response - run in scheduler
if ctx.request.id is None:
scheduler = await self.entrypoint.get_scheduler()
await scheduler.spawn(call_sync_async(self.func, **solved_dependency.values))
return {}

# Для обычных запросов продолжаем как раньше
result = await call_sync_async(self.func, **solved_dependency.values)

response = {
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rst_include = "^2.1.0"
pytest = "^6.2"
sentry-sdk = "^2.0"
requests = ">0.0.0"
httpx = ">=0.23.0,<0.24.0" # FastAPI/Starlette extra test deps
httpx = ">=0.27.0,<0.29.0" # FastAPI/Starlette extra test deps

[build-system]
requires = ["poetry>=0.12"]
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ def requester(method, params, request_id=0):
return requester


@pytest.fixture
def ep_wait_all_requests_done(app_client, ep):
"""Returns function which waits until inner scheduler was empty
That's means all requests are done
"""
def wait_empty(ep=ep):
app_client.portal.call(ep.scheduler.wait_and_close)

return wait_empty


@pytest.fixture
def openapi_compatible():
supported_openapi_versions = [packaging.version.parse("3.0.2"), packaging.version.parse("3.1.0")]
Expand Down
14 changes: 10 additions & 4 deletions tests/test_jsonrpc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import threading

from json import dumps as json_dumps
from typing import List

Expand Down Expand Up @@ -48,28 +50,30 @@ def test_no_params(echo, json_request):


@pytest.mark.parametrize('request_id', [111, 'qwe'])
def test_basic(echo, json_request, request_id):
def test_basic(echo, json_request, request_id, ep_wait_all_requests_done):
resp = json_request({
'id': request_id,
'jsonrpc': '2.0',
'method': 'echo',
'params': {'data': 'data-123'},
})
assert resp == {'id': request_id, 'jsonrpc': '2.0', 'result': 'data-123'}
ep_wait_all_requests_done()
assert echo.history == ['data-123']


def test_notify(echo, raw_request):
def test_notify(echo, raw_request, ep_wait_all_requests_done):
resp = raw_request(json_dumps({
'jsonrpc': '2.0',
'method': 'echo',
'params': {'data': 'data-123'},
}))
assert not resp.content
ep_wait_all_requests_done()
assert echo.history == ['data-123']


def test_batch_notify(echo, raw_request):
def test_batch_notify(echo, raw_request, ep_wait_all_requests_done):
resp = raw_request(json_dumps([
{
'jsonrpc': '2.0',
Expand All @@ -83,6 +87,7 @@ def test_batch_notify(echo, raw_request):
},
]))
assert not resp.content
ep_wait_all_requests_done()
assert set(echo.history) == {'data-111', 'data-222'}


Expand Down Expand Up @@ -279,7 +284,7 @@ def test_method_not_found(echo, json_request):
assert echo.history == []


def test_batch(echo, json_request):
def test_batch(echo, json_request, ep_wait_all_requests_done):
resp = json_request([
{
'id': 111,
Expand Down Expand Up @@ -310,6 +315,7 @@ def test_batch(echo, json_request):
{'id': 'qwe', 'jsonrpc': '2.0', 'result': 'data-qwe'},
{'id': 'method-not-found', 'jsonrpc': '2.0', 'error': {'code': -32601, 'message': 'Method not found'}},
]
ep_wait_all_requests_done()
assert set(echo.history) == {'data-111', 'data-notify', 'data-qwe'}


Expand Down
174 changes: 174 additions & 0 deletions tests/test_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import asyncio
import collections
import time
from typing import Dict

import pytest
from fastapi import Body


class ExecutionTracker:
def __init__(self):
self.executions = []
self.last_execution_time = 0

def record(self, method_name, delay):
self.executions.append((method_name, delay))
self.last_execution_time = time.monotonic()


@pytest.fixture
def tracker():
return ExecutionTracker()


@pytest.fixture
def ep(ep, tracker):
@ep.method()
async def delayed_method(
delay: float = Body(..., ge=0),
message: str = Body(...),
) -> dict:
start_time = time.monotonic()
await asyncio.sleep(delay)
tracker.record("delayed_method", delay)
return {"message": message, "execution_time": time.monotonic() - start_time}

@ep.method()
async def instant_method(
message: str = Body(...),
) -> Dict[str, str]:
tracker.record("instant_method", 0)
return {"message": message}

return ep


def test_regular_request__no_background(app, json_request, tracker):
start_time = time.monotonic()
delay = 0.5

# Запрос с ID (синхронный)
response = json_request(
{
"jsonrpc": "2.0",
"method": "delayed_method",
"params": {"delay": delay, "message": "sync request"},
"id": 1
}
)

execution_time = time.monotonic() - start_time

# Проверяем, что время выполнения больше чем задержка (т.е. запрос ждал завершения)
assert execution_time >= delay
assert response == {
"jsonrpc": "2.0",
"result": {
"message": "sync request",
"execution_time": pytest.approx(delay, abs=0.1)
},
"id": 1
}
assert len(tracker.executions) == 1
assert tracker.executions[0][0] == "delayed_method"


def test_single_request__notification_in_background(app, app_client, tracker, ep_wait_all_requests_done):
start_time = time.monotonic()
delay = 0.5

# Запрос без ID (уведомление, должен выполниться асинхронно)
response = app_client.post(
"/api/v1/jsonrpc",
json={
"jsonrpc": "2.0",
"method": "delayed_method",
"params": {"delay": delay, "message": "async notification"}
}
)

execution_time = time.monotonic() - start_time

# Проверяем, что время выполнения меньше чем задержка (т.е. запрос не ждал завершения)
assert execution_time < delay
assert response.status_code == 200
assert response.content == b'' # Пустой ответ для уведомления

# Ждем, чтобы убедиться что задача завершилась
ep_wait_all_requests_done()

# Проверяем, что функция действительно была выполнена
assert len(tracker.executions) == 1
assert tracker.executions[0][0] == "delayed_method"


def test_batch_request__notification_in_background(app, app_client, tracker, ep_wait_all_requests_done):
start_time = time.monotonic()
delay1 = 0.5
delay2 = 0.3

# Batch-запрос с обычными запросами и уведомлениями
response = app_client.post(
"/api/v1/jsonrpc",
json=[
# Обычный запрос
{
"jsonrpc": "2.0",
"method": "delayed_method",
"params": {"delay": delay1, "message": "sync request 1"},
"id": 1
},
# Уведомление
{
"jsonrpc": "2.0",
"method": "delayed_method",
"params": {"delay": delay2, "message": "notification 1"}
},
# Еще один обычный запрос
{
"jsonrpc": "2.0",
"method": "instant_method",
"params": {"message": "sync request 2"},
"id": 2
},
# Еще одно уведомление
{
"jsonrpc": "2.0",
"method": "instant_method",
"params": {"message": "notification 2"}
}
]
)

execution_time = time.monotonic() - start_time

# Проверяем, что время выполнения больше чем максимальная задержка среди обычных запросов
assert execution_time >= delay1
assert response.status_code == 200

result = response.json()
# В ответе должны быть только запросы с ID
assert len(result) == 2

# Проверяем содержимое ответов (порядок может быть любым)
result_dict = {item["id"]: item for item in result}

assert result_dict[1]["jsonrpc"] == "2.0"
assert result_dict[1]["result"]["message"] == "sync request 1"
assert float(result_dict[1]["result"]["execution_time"]) >= delay1

assert result_dict[2]["jsonrpc"] == "2.0"
assert result_dict[2]["result"]["message"] == "sync request 2"

# Ждем, чтобы убедиться что все задачи завершились
ep_wait_all_requests_done()

# Проверяем что все функции действительно были выполнены (всего 4)
assert len(tracker.executions) == 4

# Проверяем типы выполненных функций (должны быть 2 delayed_method и 2 instant_method)
method_counts = collections.Counter((x[0] for x in tracker.executions))

assert method_counts["delayed_method"] == 2
assert method_counts["instant_method"] == 2