From de3fff0d6fb54dbaa411a537559772e7ecc521de Mon Sep 17 00:00:00 2001 From: Mark Jones Date: Thu, 18 Jul 2019 20:56:00 +0100 Subject: [PATCH] Added optional key parameter to publish --- README.md | 1 + examples/KafkaPublishExample.php | 2 ++ src/KafkaPubSubAdapter.php | 11 +++++------ tests/KafkaPubSubAdapterTest.php | 25 +++++++++++++++++++++++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index bf0fb3c..34b13c2 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ $adapter->publish('my_channel', 'HELLO WORLD'); $adapter->publish('my_channel', ['hello' => 'world']); $adapter->publish('my_channel', 1); $adapter->publish('my_channel', false); +$adapter->publish('my_channel', ['hello' => 'world'], 1); // publish multiple messages $messages = [ diff --git a/examples/KafkaPublishExample.php b/examples/KafkaPublishExample.php index 198a6b1..ff3c324 100644 --- a/examples/KafkaPublishExample.php +++ b/examples/KafkaPublishExample.php @@ -31,3 +31,5 @@ for ($x = 0; $x < 10; $x++) { $adapter->publish('my_channel', $x); } + +$adapter->publish('my_channel', 'With a key', 1); \ No newline at end of file diff --git a/src/KafkaPubSubAdapter.php b/src/KafkaPubSubAdapter.php index 5d38e23..79e7893 100644 --- a/src/KafkaPubSubAdapter.php +++ b/src/KafkaPubSubAdapter.php @@ -91,15 +91,14 @@ public function subscribe($channel, callable $handler) } /** - * Publish a message to a channel. - * - * @param string $channel - * @param mixed $message + * @param $channel + * @param $message + * @param null $key */ - public function publish($channel, $message) + public function publish($channel, $message, $key = null) { $topic = $this->producer->newTopic($channel); - $topic->produce(RD_KAFKA_PARTITION_UA, 0, Utils::serializeMessage($message)); + $topic->produce(RD_KAFKA_PARTITION_UA, 0, Utils::serializeMessage($message), $key); } /** diff --git a/tests/KafkaPubSubAdapterTest.php b/tests/KafkaPubSubAdapterTest.php index 3c64006..66e84d9 100644 --- a/tests/KafkaPubSubAdapterTest.php +++ b/tests/KafkaPubSubAdapterTest.php @@ -256,6 +256,31 @@ public function testPublish() $adapter->publish('channel_name', ['hello' => 'world']); } + public function testPublishWithKey() + { + $topic = Mockery::mock(\RdKafka\Topic::class); + $topic->shouldReceive('produce') + ->withArgs([ + RD_KAFKA_PARTITION_UA, + 0, + '{"id:1", "hello":"world"}', + 1 + ]) + ->once(); + + $producer = Mockery::mock(\RdKafka\Producer::class); + $producer->shouldReceive('newTopic') + ->with('channel_name') + ->once() + ->andReturn($topic); + + $consumer = Mockery::mock(\RdKafka\KafkaConsumer::class); + + $adapter = new KafkaPubSubAdapter($producer, $consumer); + + $adapter->publish('channel_name', ['id' => 1, 'hello' => 'world'], 1); + } + public function testPublishBatch() { $topic = Mockery::mock(\RdKafka\Topic::class);