Skip to content

Commit 4c3f0aa

Browse files
committed
Merge branch 'release/0.1.1'
2 parents ee98950 + 2c0148c commit 4c3f0aa

File tree

17 files changed

+567
-333
lines changed

17 files changed

+567
-333
lines changed

.flake8

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ ignore =
8888
; Consider possible security implications associated with pickle module
8989
; Pickle and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue
9090
S403, S301
91+
; Found too many imported names from a module
92+
WPS235
9193

9294
per-file-ignores =
9395
; all tests
@@ -104,6 +106,10 @@ per-file-ignores =
104106
DAR101,
105107
; Found too many arguments
106108
WPS211,
109+
; Found nested function
110+
WPS430,
111+
; Found too short name
112+
WPS111,
107113

108114
; all init files
109115
__init__.py:

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: Testing taskiq-pipelines
22

3-
on: push
3+
on: pull_request
44

55
jobs:
66
black:

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ repos:
4141
language: system
4242
pass_filenames: false
4343
types: [python]
44-
args: [--count, taskiq_pipelines]
44+
args: [--count, taskiq_pipelines, tests]
4545

4646
- id: mypy
4747
name: Validate types with MyPy

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,14 @@ We have a few steps available for chaining calls:
138138
This type of step is just an ordinary call of the function.
139139
If you haven't specified `param_name` argument, then the result
140140
of the previous step will be passed as the first argument of the function.
141-
Uf you did specify the `param_name` argument, then the result of the previous
141+
If you did specify the `param_name` argument, then the result of the previous
142142
step can be found in key word arguments with the param name you specified.
143143

144144
You can add sequential steps with `.call_next` method of the pipeline.
145145

146+
If you don't want to pass the result of the previous step to the next one,
147+
you can use `.call_after` method of the pipeline.
148+
146149
### Mapper step
147150

148151
This step runs specified task for each item of the previous task's result spawning

poetry.lock

Lines changed: 392 additions & 297 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq-pipelines"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
description = "Taskiq pipelines for task chaining."
55
authors = ["Pavel Kirilin <win10@list.ru>"]
66
readme = "README.md"
@@ -27,7 +27,7 @@ keywords = ["taskiq", "pipelines", "tasks", "distributed", "async"]
2727
python = "^3.8.1"
2828
taskiq = ">=0.0.8, <1"
2929
typing-extensions = "^4.3.0"
30-
pydantic = "^1.6.2"
30+
pydantic = "^2"
3131

3232
[tool.poetry.dev-dependencies]
3333
pytest = "^7"

taskiq_pipelines/abc.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
class AbstractStep(ABC):
1010
"""Abstract pipeline step."""
1111

12-
step_name: str
13-
known_steps: "Dict[str, Type[AbstractStep]]" = {}
12+
_step_name: str
13+
_known_steps: "Dict[str, Type[AbstractStep]]" = {}
1414

1515
def __init_subclass__(cls, step_name: str, **kwargs: Any) -> None:
1616
super().__init_subclass__(**kwargs)
1717
# Sets step name to the step.
18-
cls.step_name = step_name
18+
cls._step_name = step_name
1919
# Registers new subclass in the dict of
2020
# known steps.
21-
cls.known_steps[step_name] = cls
21+
cls._known_steps[step_name] = cls
2222

2323
@abstractmethod
2424
def dumps(self) -> str:

taskiq_pipelines/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1+
from typing import Literal
2+
13
CURRENT_STEP = "_pipe_current_step"
24
PIPELINE_DATA = "_pipe_data"
5+
6+
EMPTY_PARAM_NAME: Literal[-1] = -1

taskiq_pipelines/middleware.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ async def post_save( # noqa: C901, WPS212
4242
return
4343
pipeline_data = message.labels[PIPELINE_DATA]
4444
try:
45-
steps_data = pydantic.parse_raw_as(List[DumpedStep], pipeline_data)
45+
steps_data = pydantic.TypeAdapter(List[DumpedStep]).validate_json(
46+
pipeline_data,
47+
)
4648
except ValueError:
4749
return
4850
if current_step_num + 1 >= len(steps_data):
@@ -99,7 +101,7 @@ async def on_error(
99101
return
100102
pipe_data = message.labels[PIPELINE_DATA]
101103
try:
102-
steps = pydantic.parse_raw_as(List[DumpedStep], pipe_data)
104+
steps = pydantic.TypeAdapter(List[DumpedStep]).validate_json(pipe_data)
103105
except ValueError:
104106
return
105107
if current_step_num == len(steps) - 1:

taskiq_pipelines/pipeliner.py

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
import json
2-
from typing import Any, Coroutine, Generic, List, Optional, TypeVar, Union, overload
2+
from typing import (
3+
Any,
4+
Coroutine,
5+
Generic,
6+
List,
7+
Literal,
8+
Optional,
9+
TypeVar,
10+
Union,
11+
overload,
12+
)
313

414
import pydantic
515
from taskiq import AsyncBroker, AsyncTaskiqTask
616
from taskiq.decor import AsyncTaskiqDecoratedTask
717
from taskiq.kicker import AsyncKicker
818
from typing_extensions import ParamSpec
919

10-
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
20+
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
1121
from taskiq_pipelines.steps import FilterStep, MapperStep, SequentialStep, parse_step
1222

1323
_ReturnType = TypeVar("_ReturnType")
@@ -58,7 +68,7 @@ def call_next(
5868
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
5969
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
6070
],
61-
param_name: Optional[str] = None,
71+
param_name: Union[Optional[str], Literal[-1]] = None,
6272
**additional_kwargs: Any,
6373
) -> "Pipeline[_FuncParams, _T2]":
6474
...
@@ -70,7 +80,7 @@ def call_next(
7080
AsyncKicker[Any, _T2],
7181
AsyncTaskiqDecoratedTask[Any, _T2],
7282
],
73-
param_name: Optional[str] = None,
83+
param_name: Union[Optional[str], Literal[-1]] = None,
7484
**additional_kwargs: Any,
7585
) -> "Pipeline[_FuncParams, _T2]":
7686
...
@@ -81,7 +91,7 @@ def call_next(
8191
AsyncKicker[Any, Any],
8292
AsyncTaskiqDecoratedTask[Any, Any],
8393
],
84-
param_name: Optional[str] = None,
94+
param_name: Union[Optional[str], Literal[-1]] = None,
8595
**additional_kwargs: Any,
8696
) -> Any:
8797
"""
@@ -94,13 +104,14 @@ def call_next(
94104
if param_name is specified.
95105
96106
:param task: task to execute.
97-
:param param_name: kwarg param name, defaults to None
107+
:param param_name: kwarg param name, defaults to None.
108+
If set to -1 (EMPTY_PARAM_NAME), result is not passed.
98109
:param additional_kwargs: additional kwargs to task.
99110
:return: updated pipeline.
100111
"""
101112
self.steps.append(
102113
DumpedStep(
103-
step_type=SequentialStep.step_name,
114+
step_type=SequentialStep._step_name, # noqa: WPS437
104115
step_data=SequentialStep.from_task(
105116
task=task,
106117
param_name=param_name,
@@ -111,6 +122,62 @@ def call_next(
111122
)
112123
return self
113124

125+
@overload
126+
def call_after(
127+
self: "Pipeline[_FuncParams, _ReturnType]",
128+
task: Union[
129+
AsyncKicker[Any, Coroutine[Any, Any, _T2]],
130+
AsyncTaskiqDecoratedTask[Any, Coroutine[Any, Any, _T2]],
131+
],
132+
**additional_kwargs: Any,
133+
) -> "Pipeline[_FuncParams, _T2]":
134+
...
135+
136+
@overload
137+
def call_after(
138+
self: "Pipeline[_FuncParams, _ReturnType]",
139+
task: Union[
140+
AsyncKicker[Any, _T2],
141+
AsyncTaskiqDecoratedTask[Any, _T2],
142+
],
143+
**additional_kwargs: Any,
144+
) -> "Pipeline[_FuncParams, _T2]":
145+
...
146+
147+
def call_after(
148+
self,
149+
task: Union[
150+
AsyncKicker[Any, Any],
151+
AsyncTaskiqDecoratedTask[Any, Any],
152+
],
153+
**additional_kwargs: Any,
154+
) -> Any:
155+
"""
156+
Adds sequential step.
157+
158+
This task will be executed right after
159+
the previous and result of the previous task
160+
is not passed to the next task.
161+
162+
This is equivalent to call_next(task, param_name=-1).
163+
164+
:param task: task to execute.
165+
:param additional_kwargs: additional kwargs to task.
166+
:return: updated pipeline.
167+
"""
168+
self.steps.append(
169+
DumpedStep(
170+
step_type=SequentialStep._step_name, # noqa: WPS437
171+
step_data=SequentialStep.from_task(
172+
task=task,
173+
param_name=EMPTY_PARAM_NAME,
174+
**additional_kwargs,
175+
).dumps(),
176+
task_id="",
177+
),
178+
)
179+
return self
180+
114181
@overload
115182
def map(
116183
self: "Pipeline[_FuncParams, _ReturnType]",
@@ -169,7 +236,7 @@ def map(
169236
"""
170237
self.steps.append(
171238
DumpedStep(
172-
step_type=MapperStep.step_name,
239+
step_type=MapperStep._step_name, # noqa: WPS437
173240
step_data=MapperStep.from_task(
174241
task=task,
175242
param_name=param_name,
@@ -241,7 +308,7 @@ def filter(
241308
"""
242309
self.steps.append(
243310
DumpedStep(
244-
step_type=FilterStep.step_name,
311+
step_type=FilterStep._step_name, # noqa: WPS437
245312
step_data=FilterStep.from_task(
246313
task=task,
247314
param_name=param_name,
@@ -261,7 +328,7 @@ def dumps(self) -> str:
261328
:returns: serialized pipeline.
262329
"""
263330
return json.dumps(
264-
[step.dict() for step in self.steps],
331+
[step.model_dump() for step in self.steps],
265332
)
266333

267334
@classmethod
@@ -277,7 +344,7 @@ def loads(cls, broker: AsyncBroker, pipe_data: str) -> "Pipeline[Any, Any]":
277344
:return: new
278345
"""
279346
pipe: "Pipeline[Any, Any]" = Pipeline(broker)
280-
pipe.steps = pydantic.parse_raw_as(List[DumpedStep], pipe_data)
347+
pipe.steps = pydantic.TypeAdapter(List[DumpedStep]).validate_json(pipe_data)
281348
return pipe
282349

283350
async def kiq(

taskiq_pipelines/steps/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
def parse_step(step_type: str, step_data: str) -> AbstractStep:
13-
step_cls = AbstractStep.known_steps.get(step_type)
13+
step_cls = AbstractStep._known_steps.get(step_type) # noqa: WPS437
1414
if step_cls is None:
1515
logger.warning(f"Unknown step type: {step_type}")
1616
raise ValueError("Unknown step type.")

taskiq_pipelines/steps/filter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def dumps(self) -> str:
8484
8585
:return: returns json.
8686
"""
87-
return self.json()
87+
return self.model_dump_json()
8888

8989
@classmethod
9090
def loads(cls, data: str) -> "FilterStep":
@@ -94,7 +94,7 @@ def loads(cls, data: str) -> "FilterStep":
9494
:param data: dumped data.
9595
:return: parsed step.
9696
"""
97-
return pydantic.parse_raw_as(FilterStep, data)
97+
return pydantic.TypeAdapter(FilterStep).validate_json(data)
9898

9999
async def act(
100100
self,

taskiq_pipelines/steps/mapper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def dumps(self) -> str:
8181
8282
:return: returns json.
8383
"""
84-
return self.json()
84+
return self.model_dump_json()
8585

8686
@classmethod
8787
def loads(cls, data: str) -> "MapperStep":
@@ -91,7 +91,7 @@ def loads(cls, data: str) -> "MapperStep":
9191
:param data: dumped data.
9292
:return: parsed step.
9393
"""
94-
return pydantic.parse_raw_as(MapperStep, data)
94+
return pydantic.TypeAdapter(MapperStep).validate_json(data)
9595

9696
async def act(
9797
self,

taskiq_pipelines/steps/sequential.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from taskiq.kicker import AsyncKicker
66

77
from taskiq_pipelines.abc import AbstractStep
8-
from taskiq_pipelines.constants import CURRENT_STEP, PIPELINE_DATA
8+
from taskiq_pipelines.constants import CURRENT_STEP, EMPTY_PARAM_NAME, PIPELINE_DATA
99

1010

1111
class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
@@ -19,7 +19,9 @@ class SequentialStep(pydantic.BaseModel, AbstractStep, step_name="sequential"):
1919

2020
task_name: str
2121
labels: Dict[str, str]
22-
param_name: Optional[str]
22+
# order is important here, otherwise pydantic will always choose str.
23+
# we use int instead of Literal[-1] because pydantic thinks that -1 is always str.
24+
param_name: Union[Optional[int], str]
2325
additional_kwargs: Dict[str, Any]
2426

2527
def dumps(self) -> str:
@@ -28,7 +30,7 @@ def dumps(self) -> str:
2830
2931
:return: returns json.
3032
"""
31-
return self.json()
33+
return self.model_dump_json()
3234

3335
@classmethod
3436
def loads(cls, data: str) -> "SequentialStep":
@@ -38,7 +40,7 @@ def loads(cls, data: str) -> "SequentialStep":
3840
:param data: dumped data.
3941
:return: parsed step.
4042
"""
41-
return pydantic.parse_raw_as(SequentialStep, data)
43+
return pydantic.TypeAdapter(SequentialStep).validate_json(data)
4244

4345
async def act(
4446
self,
@@ -78,9 +80,11 @@ async def act(
7880
**{PIPELINE_DATA: pipe_data, CURRENT_STEP: step_number}, # type: ignore
7981
)
8082
)
81-
if self.param_name:
83+
if isinstance(self.param_name, str):
8284
self.additional_kwargs[self.param_name] = result.return_value
8385
await kicker.kiq(**self.additional_kwargs)
86+
elif self.param_name == EMPTY_PARAM_NAME:
87+
await kicker.kiq(**self.additional_kwargs)
8488
else:
8589
await kicker.kiq(result.return_value, **self.additional_kwargs)
8690

@@ -91,7 +95,7 @@ def from_task(
9195
AsyncKicker[Any, Any],
9296
AsyncTaskiqDecoratedTask[Any, Any],
9397
],
94-
param_name: Optional[str],
98+
param_name: Union[Optional[str], int],
9599
**additional_kwargs: Any,
96100
) -> "SequentialStep":
97101
"""

taskiq_pipelines/tests/test_stub.py

Lines changed: 0 additions & 3 deletions
This file was deleted.

0 commit comments

Comments
 (0)