@@ -85,6 +85,7 @@ def __init__(self,
85
85
self .kwargs = kwargs
86
86
87
87
self .task = None # placeholder for the repeat task created in self.__wrapper
88
+ self .__next_run_time = None # placeholder for the current target run time
88
89
89
90
def __call__ (self , func : CoroFunction ):
90
91
return self .__wrapper (func )
@@ -102,19 +103,24 @@ async def wrapped() -> None:
102
103
async def inner ():
103
104
# maybe wait for next trigger cycle
104
105
if not self .on_startup :
105
- next_run = self .next_run
106
+ self . __next_run_time = self .next_run
106
107
if self .logger :
107
108
self .logger .info (
108
109
f'`on_startup` is set to `False`. First run of { self .__class__ .__name__ } for '
109
- f'{ func .__name__ } : { next_run .isoformat ()} '
110
+ f'{ func .__name__ } : { self . __next_run_time .isoformat ()} '
110
111
)
111
- await self .sleep_until (next_run )
112
+ await self .sleep_until (self . __next_run_time )
112
113
113
114
# repeat indefinitely
114
115
while True :
115
116
if self .logger :
116
117
self .logger .info (f'Running { self .__class__ .__name__ } for { func .__name__ } ' )
117
118
119
+ # safeguard against early triggers - apparently, a desync is possible
120
+ now = datetime .now ().astimezone ()
121
+ if self .__next_run_time is not None and now < self .__next_run_time :
122
+ await asyncio .sleep (max ((self .__next_run_time - now ).total_seconds (), 0 ))
123
+
118
124
# call the decorated function
119
125
try :
120
126
if self .iter_args :
@@ -130,17 +136,18 @@ async def inner():
130
136
await self .__handle_exception (func , None , e )
131
137
132
138
# sleep until next execution time
133
- next_run = self .next_run
134
- if self .logger and datetime .now ().astimezone () <= next_run :
139
+ self . __next_run_time = self .next_run
140
+ if self .logger and datetime .now ().astimezone () <= self . __next_run_time :
135
141
self .logger .info (
136
- f'{ self .__class__ .__name__ } finished for { func .__name__ } . Next run: { next_run .isoformat ()} '
142
+ f'{ self .__class__ .__name__ } finished for { func .__name__ } . '
143
+ f'Next run: { self .__next_run_time .isoformat ()} '
137
144
)
138
145
elif self .logger : # i.e. next_run is in the past
139
146
self .logger .warning (
140
147
f'{ self .__class__ .__name__ } missed the scheduled run time for { func .__name__ } . Running now'
141
148
)
142
149
143
- await self .sleep_until (next_run )
150
+ await self .sleep_until (self . __next_run_time )
144
151
145
152
# create a reference to the repeating task to prevent it from accidentally being garbage collected
146
153
self .task = self .loop .create_task (inner ())
0 commit comments