Skip to content

release 0.2.0 #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
run: python -m build

- name: Publish
uses: pypa/gh-action-pypi-publish@v1.8.14
uses: pypa/gh-action-pypi-publish@v1.9.0
with:
password: ${{ secrets.PYPI_TOKEN }}

Expand Down
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ classifiers = [
dynamic = ["version"]

dependencies = [
"taskiq>=0.10.0,<1.0.0",
"taskiq>=0.11.0,<0.12.0",
"faststream>=0.3.14,<0.6.0",
]

Expand All @@ -59,6 +59,10 @@ kafka = [
"faststream[kafka]"
]

confluent = [
"faststream[confluent]"
]

redis = [
"faststream[redis]"
]
Expand All @@ -68,6 +72,7 @@ test = [
"taskiq-faststream[nats]",
"taskiq-faststream[rabbit]",
"taskiq-faststream[kafka]",
"taskiq-faststream[confluent]",
"taskiq-faststream[redis]",

"coverage[toml]>=7.2.0,<8.0.0",
Expand All @@ -77,7 +82,7 @@ test = [
dev = [
"taskiq-faststream[test]",

"mypy>=1.8.0,<1.10.0",
"mypy>=1.8.0,<1.12.0",
"ruff==0.4.1",
"pre-commit >=3.6.0,<4.0.0",
]
Expand Down
3 changes: 2 additions & 1 deletion taskiq_faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""FastStream - taskiq integration to schedule FastStream tasks."""
__version__ = "0.1.8"

__version__ = "0.2.0"
22 changes: 9 additions & 13 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import anyio
from faststream.app import FastStream
from faststream.types import SendableMessage
from taskiq import AsyncBroker, BrokerMessage
from taskiq import AsyncBroker
from taskiq.acks import AckableMessage
from taskiq.decor import AsyncTaskiqDecoratedTask
from typing_extensions import TypeAlias, override

from taskiq_faststream.serializer import PatchedSerializer
from taskiq_faststream.formatter import PatchedFormatter, PathcedMessage
from taskiq_faststream.types import ScheduledTask
from taskiq_faststream.utils import resolve_msg

Expand All @@ -33,7 +33,7 @@ class BrokerWrapper(AsyncBroker):

def __init__(self, broker: Any) -> None:
super().__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter()
self.broker = broker

async def startup(self) -> None:
Expand All @@ -46,7 +46,7 @@ async def shutdown(self) -> None:
await self.broker.close()
await super().shutdown()

async def kick(self, message: BrokerMessage) -> None:
async def kick(self, message: PathcedMessage) -> None: # type: ignore[override]
"""Call wrapped FastStream broker `publish` method."""
await _broker_publish(self.broker, message)

Expand Down Expand Up @@ -109,7 +109,7 @@ class AppWrapper(BrokerWrapper):

def __init__(self, app: FastStream) -> None:
super(BrokerWrapper, self).__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter()
self.app = app

async def startup(self) -> None:
Expand All @@ -122,7 +122,7 @@ async def shutdown(self) -> None:
await self.app._shutdown() # noqa: SLF001
await super(BrokerWrapper, self).shutdown()

async def kick(self, message: BrokerMessage) -> None:
async def kick(self, message: PathcedMessage) -> None: # type: ignore[override]
"""Call wrapped FastStream broker `publish` method."""
assert ( # noqa: S101
self.app.broker
Expand All @@ -132,11 +132,7 @@ async def kick(self, message: BrokerMessage) -> None:

async def _broker_publish(
broker: Any,
message: BrokerMessage,
message: PathcedMessage,
) -> None:
labels = message.labels
labels.pop("schedule", None)
async for msg in resolve_msg(
msg=labels.pop("message", message.message),
):
await broker.publish(msg, **labels)
async for msg in resolve_msg(message.body):
await broker.publish(msg, **message.labels)
45 changes: 45 additions & 0 deletions taskiq_faststream/formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from dataclasses import dataclass
from typing import Any, Dict

from taskiq.abc.formatter import TaskiqFormatter
from taskiq.message import TaskiqMessage


@dataclass
class PathcedMessage:
"""DTO to transfer data to `broker.kick`."""

body: Any
labels: Dict[str, Any]


class PatchedFormatter(TaskiqFormatter):
"""Default taskiq formatter."""

def dumps( # type: ignore[override]
self,
message: TaskiqMessage,
) -> PathcedMessage:
"""
Dumps taskiq message to some broker message format.

:param message: message to send.
:return: Dumped message.
"""
labels = message.labels
labels.pop("schedule", None)
labels.pop("schedule_id", None)

return PathcedMessage(
body=labels.pop("message", None),
labels=labels,
)

def loads(self, message: bytes) -> TaskiqMessage:
"""
Loads json from message.

:param message: broker's message.
:return: parsed taskiq message.
"""
raise NotImplementedError
17 changes: 0 additions & 17 deletions taskiq_faststream/serializer.py

This file was deleted.

1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ def mock() -> MagicMock:


@pytest.fixture()
@pytest.mark.anyio
async def event() -> asyncio.Event:
return asyncio.Event()
2 changes: 1 addition & 1 deletion tests/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def handler(msg: str) -> None:
**{self.subj_name: subject},
schedule=[
{
"time": datetime.utcnow(),
"time": datetime.utcnow(), # old python compat
},
],
)
Expand Down
Loading