File tree Expand file tree Collapse file tree 2 files changed +39
-0
lines changed Expand file tree Collapse file tree 2 files changed +39
-0
lines changed Original file line number Diff line number Diff line change @@ -165,6 +165,7 @@ public function pop($subscriber = null)
165165 'returnImmediately ' => true ,
166166 'maxMessages ' => 1 ,
167167 ]);
168+ $ messages = $ this ->checkAvailabilty ($ messages );
168169
169170 if (!empty ($ messages ) && count ($ messages ) > 0 ) {
170171 return new PubSubJob (
@@ -178,6 +179,18 @@ public function pop($subscriber = null)
178179 }
179180 }
180181
182+ public function checkAvailabilty ($ messages ) {
183+ if (!empty ($ messages ) && count ($ messages ) > 0 ) {
184+ $ availableAt = $ messages [0 ]->attribute ('available_at ' );
185+ $ now = (new \DateTime ('now ' ))->getTimestamp ();
186+ if (!$ availableAt ) {
187+ return $ messages ;
188+ } else if ($ availableAt < $ now ) {
189+ return $ messages ;
190+ }
191+ }
192+ }
193+
181194 /**
182195 * Push an array of jobs onto the queue.
183196 *
Original file line number Diff line number Diff line change @@ -298,4 +298,30 @@ public function testGetPubSub()
298298 {
299299 $ this ->assertTrue ($ this ->queue ->getPubSub () instanceof PubSubClient);
300300 }
301+
302+ public function testDelayedMessage ()
303+ {
304+ $ this ->subscription ->method ('pull ' )
305+ ->willReturn ([$ this ->message ]);
306+
307+ $ this ->message ->method ('attribute ' )
308+ ->with ($ this ->equalTo ('available_at ' ))
309+ ->willReturn ((new \DateTime ('now +1seconds ' ))->getTimestamp ());
310+
311+ $ this ->topic ->method ('subscription ' )
312+ ->willReturn ($ this ->subscription );
313+
314+ $ this ->topic ->method ('exists ' )
315+ ->willReturn (true );
316+
317+ $ this ->queue ->method ('getTopic ' )
318+ ->willReturn ($ this ->topic );
319+
320+ $ this ->queue ->setContainer ($ this ->createMock (Container::class));
321+
322+ $ this ->assertTrue (is_null ($ this ->queue ->pop ('test ' )));
323+ sleep (2 );
324+ $ this ->assertTrue ($ this ->queue ->pop ('test ' ) instanceof PubSubJob);
325+
326+ }
301327}
You can’t perform that action at this time.
0 commit comments