Skip to content

Commit b8d9d74

Browse files
author
wushaobin
committed
Fixes #30 decrement received messages
1 parent 6e063ec commit b8d9d74

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

src/Consumer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public function receive(bool $loop = true): Message
225225
$this->messageQueue->enqueue($message);
226226
}
227227

228-
$consumer->decrement(1);
228+
$consumer->decrement(sizeof($messages));
229229

230230
return $this->messageQueue->dequeue();
231231
}
@@ -358,4 +358,4 @@ protected function getPartitionConsumer(int $consumerID): PartitionConsumer
358358
return $this->consumers[ $consumerID ];
359359
}
360360

361-
}
361+
}

src/Reader.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public function next(): Message
136136
$this->messageQueue->enqueue($message);
137137
}
138138

139-
$consumer->decrement(1);
139+
$consumer->decrement(sizeof($messages));
140140

141141
return $this->messageQueue->dequeue();
142142
}
@@ -176,4 +176,4 @@ protected function getWaitSeconds(): int
176176
{
177177
return 30;
178178
}
179-
}
179+
}

0 commit comments

Comments
 (0)