File tree Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Original file line number Diff line number Diff line change 3
3
import datetime
4
4
import random
5
5
from logging import getLogger
6
+ from types import NoneType
6
7
from typing import Any
7
8
8
9
from taskiq import ScheduleSource
@@ -61,6 +62,11 @@ def __init__(
61
62
self .max_delay_exponent = max_delay_exponent
62
63
self .schedule_source = schedule_source
63
64
65
+ if not isinstance (schedule_source , (ScheduleSource , NoneType )):
66
+ raise TypeError (
67
+ "schedule_source must be an instance of ScheduleSource or None" ,
68
+ )
69
+
64
70
def is_retry_on_error (self , message : TaskiqMessage ) -> bool :
65
71
"""
66
72
Check if retry is enabled for this task.
@@ -103,7 +109,9 @@ async def on_send(
103
109
delay : float ,
104
110
) -> None :
105
111
"""Execute the task with a delay."""
106
- if isinstance (self .schedule_source , ScheduleSource ):
112
+ if self .schedule_source is None :
113
+ await kicker .with_labels (delay = delay ).kiq (* message .args , ** message .kwargs )
114
+ else :
107
115
target_time = datetime .datetime .now (datetime .UTC ) + datetime .timedelta (
108
116
seconds = delay ,
109
117
)
@@ -113,8 +121,6 @@ async def on_send(
113
121
* message .args ,
114
122
** message .kwargs ,
115
123
)
116
- else :
117
- await kicker .with_labels (delay = delay ).kiq (* message .args , ** message .kwargs )
118
124
119
125
async def on_error (
120
126
self ,
You can’t perform that action at this time.
0 commit comments