Skip to content

Commit 846f840

Browse files
committed
AmqpQueue was not deleting consumed messages
1 parent ac2abae commit 846f840

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

src/Adapters/AmqpQueue.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public function pop() : Event
7373
$this->declareQueue();
7474
$message = $this->amqpChannel->basic_get($this->queueName);
7575

76+
if (!empty($message)) {
77+
$this->amqpChannel->basic_ack($message->delivery_info['delivery_tag']);
78+
}
79+
7680
return ($message) ? $this->serializer->unserialize($message->body) : NullEvent::create();
7781
}
7882

@@ -85,10 +89,9 @@ public function hasElements(): bool
8589
{
8690
$hasElements = false;
8791

88-
$event = $this->pop();
89-
if (false === $event instanceof NullEvent) {
92+
$message = $this->amqpChannel->basic_get($this->queueName);
93+
if ($message) {
9094
$hasElements = true;
91-
$this->push($event);
9295
}
9396

9497
return $hasElements;

0 commit comments

Comments
 (0)