Skip to content

Commit 9988837

Browse files
Add b | batches option for batch command consume (#41)
* Added -b to add a specific number of batches to be consumed set in command
1 parent 5542098 commit 9988837

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

Command/BatchConsumerCommand.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ final class BatchConsumerCommand extends BaseRabbitMqCommand
1616
*/
1717
protected $consumer;
1818

19+
/** @var int */
20+
protected $amount;
21+
1922
public function stopConsumer()
2023
{
2124
if ($this->consumer instanceof BatchConsumer) {
@@ -36,6 +39,7 @@ protected function configure()
3639
$this
3740
->setName('rabbitmq:batch:consumer')
3841
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
42+
->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', 0)
3943
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
4044
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
4145
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
@@ -52,7 +56,7 @@ protected function configure()
5256
*
5357
* @return integer 0 if everything went fine, or an error code
5458
*
55-
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
59+
* @throws \InvalidArgumentException When the number of batches to consume is less than 0
5660
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
5761
*/
5862
protected function execute(InputInterface $input, OutputInterface $output)
@@ -74,9 +78,15 @@ protected function execute(InputInterface $input, OutputInterface $output)
7478
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
7579
}
7680

81+
$this->amount = (int) $input->getOption('batches');
82+
83+
if (0 > $this->amount) {
84+
throw new \InvalidArgumentException("The -b option should be null or greater than 0");
85+
}
86+
7787
$this->initConsumer($input);
7888

79-
return $this->consumer->consume();
89+
return $this->consumer->consume($this->amount);
8090
}
8191

8292
/**

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,8 @@ How to run the following batch consumer:
913913
```
914914

915915
Important: BatchConsumers will not have the -m|messages option available
916+
Important: BatchConsumers can also have the -b|batches option available if you want to only consume a specific number of batches and then stop the consumer.
917+
! Give the number of the batches only if you want the consumer to stop after those batch messages were consumed.!
916918

917919
### STDIN Producer ###
918920

RabbitMq/BatchConsumer.php

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class BatchConsumer extends BaseAmqp implements DequeuerInterface
5959
*/
6060
protected $batchCounter = 0;
6161

62+
/**
63+
* @var int
64+
*/
65+
protected $batchAmount = 0;
66+
6267
/**
6368
* @var int
6469
*/
@@ -70,6 +75,9 @@ class BatchConsumer extends BaseAmqp implements DequeuerInterface
7075
*/
7176
protected $gracefulMaxExecutionDateTime;
7277

78+
/** @var int */
79+
private $target;
80+
7381
/**
7482
* @param \DateTime|null $dateTime
7583
*/
@@ -98,8 +106,10 @@ public function setCallback($callback)
98106
return $this;
99107
}
100108

101-
public function consume()
109+
public function consume(int $batchAmount = 0)
102110
{
111+
$this->target = $batchAmount;
112+
103113
$this->setupConsumer();
104114

105115
while (count($this->getChannel()->callbacks)) {
@@ -117,6 +127,7 @@ public function consume()
117127
} catch (AMQPTimeoutException $e) {
118128
if (!$this->isEmptyBatch()) {
119129
$this->batchConsume();
130+
$this->maybeStopConsumer();
120131
} elseif ($this->keepAlive === true) {
121132
continue;
122133
} elseif (null !== $this->getIdleTimeoutExitCode()) {
@@ -174,6 +185,7 @@ private function batchConsume()
174185
throw $e;
175186
}
176187

188+
$this->batchAmount++;
177189
$this->resetBatch();
178190
}
179191

@@ -211,6 +223,7 @@ private function handleProcessFlag($deliveryTag, $processFlag)
211223
// Remove message from queue only if callback return not false
212224
$this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag);
213225
}
226+
214227
}
215228

216229
/**
@@ -359,7 +372,7 @@ protected function maybeStopConsumer()
359372
pcntl_signal_dispatch();
360373
}
361374

362-
if ($this->forceStop) {
375+
if ($this->forceStop || ($this->batchAmount == $this->target && $this->target > 0)) {
363376
$this->stopConsuming();
364377
}
365378

0 commit comments

Comments
 (0)