-
Notifications
You must be signed in to change notification settings - Fork 197
Open
Description
Steps to reproduce
create a faust.App instance during testing
for the example, there are only 2 files app.py files with a single topic and an event handler
and a test.py file that calls this handler
#### src/kafka/app.py
import faust
class EngineState(faust.Record):
state: str
class EngineCommand(faust.Record):
command: str
app = faust.App( id=GROUP_ID, broker=BROKER, broker_credentials=broker_credentials, topic_disable_leader=True, autodiscover=('tickseed.src.kafka.handlers',))
changed_topic = app.topic( 'servis-changed', value_type=EngineState, value_serializer='json')
@app.agent(changed_topic)
async def changed_topic_agent(stream: AsyncIterable[Dict]) -> None:
async for event in stream:
logger.info(f'Received state engine change message {event}') # noqa: WPS305
engine_state: EngineState = EngineState.from_data(event)
await engine_state_changed(engine_state)
async def engine_state_changed(engine_state: EngineState) -> None:
pass
await save_to_db({'command': 'copy_files_to_s3'})
async def save_to_db(data: dict ) -> None:
pass
#### tests/test.py
import pytest
from src.kafka.app import engine_state_changed
@pytest.fixture
def mock_save_to_db():
with patch('src.kafka.app.save_to_db') as mocked:
yield mocked
@pytest.mark.anyio
async def test_changed_topic(
mock_save_to_db,
) -> None:
response_faust_record = EngineState.from_data({'state': 'file_redy'})
await engine_state_changed(response_faust_record)
mock_save_to_db.assert_awaited_once()
Expected behavior
I want to test event handlers from kafka by calling them directly with a prepared dataset while working directly with faust.Record.
and I want to use the test log in the Pipeline.
Actual behavior
Testing works, but I'm getting side effects DeprecationWarning: pkg_resources is deprecated as an API
Full traceback
$ python -Werror -m pytest
================================================================================ test session starts =================================================================================
platform linux -- Python 3.11.10, pytest-8.3.2, pluggy-1.5.0
rootdir: /home/dmitry/
configfile: pytest.ini
plugins: metadata-3.1.1, cov-5.0.0, Faker-25.9.2, anyio-4.4.0, html-4.1.1
collected 44 items / 1 error
======================================================================================= ERRORS =======================================================================================
from src.kafka.app import app
src/kafka/app.py:11: in <module>
app = faust.App(
.venv/lib/python3.11/site-packages/faust/app/base.py:498: in __init__
self.fixups = self._init_fixups()
.venv/lib/python3.11/site-packages/faust/app/base.py:550: in _init_fixups
return list(fixups(self))
.venv/lib/python3.11/site-packages/faust/fixups/__init__.py:28: in fixups
for Fixup in FIXUPS.iterate():
.venv/lib/python3.11/site-packages/mode/utils/imports.py:96: in iterate
self._maybe_finalize()
.venv/lib/python3.11/site-packages/mode/utils/imports.py:132: in _maybe_finalize
self._finalize()
.venv/lib/python3.11/site-packages/mode/utils/imports.py:136: in _finalize
self.aliases.update(dict(load_extension_class_names(namespace)))
.venv/lib/python3.11/site-packages/mode/utils/imports.py:371: in load_extension_class_names
from pkg_resources import iter_entry_points
.venv/lib/python3.11/site-packages/pkg_resources/__init__.py:111: in <module>
warnings.warn(
E DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
============================================================================== short test summary info ===============================================================================
ERROR tests/test.py - DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
================================================================================== 1 error in 0.66s ==================================================================================
Versions
- Python version 3.11
- Faust version faust-streaming = "^0.11.0"
- Operating system ubuntu 22.04
- Kafka version Docker image: [apache/kafka:3.8.0]
- RocksDB version (if applicable) n/a
Metadata
Metadata
Assignees
Labels
No labels