Skip to content

Commit af1fc00

Browse files
authored
Pass matched topic wildcards to subscription callback (#128)
1 parent 92290fb commit af1fc00

File tree

6 files changed

+70
-30
lines changed

6 files changed

+70
-30
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ $clientId = 'test-subscriber';
6060

6161
$mqtt = new \PhpMqtt\Client\MqttClient($server, $port, $clientId);
6262
$mqtt->connect();
63-
$mqtt->subscribe('php-mqtt/client/test', function ($topic, $message) {
63+
$mqtt->subscribe('php-mqtt/client/test', function ($topic, $message, $retained, $matchedWildcards) {
6464
echo sprintf("Received message on topic [%s]: %s\n", $topic, $message);
6565
}, 0);
6666
$mqtt->loop(true);
@@ -80,7 +80,7 @@ pcntl_signal(SIGINT, function (int $signal, $info) use ($mqtt) {
8080
$mqtt->interrupt();
8181
});
8282
$mqtt->connect();
83-
$mqtt->subscribe('php-mqtt/client/test', function ($topic, $message) {
83+
$mqtt->subscribe('php-mqtt/client/test', function ($topic, $message, $retained, $matchedWildcards) {
8484
echo sprintf("Received message on topic [%s]: %s\n", $topic, $message);
8585
}, 0);
8686
$mqtt->loop(true);

src/Contracts/MqttClient.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,13 @@ public function publish(string $topic, string $message, int $qualityOfService =
7676
*
7777
* The subscription callback is passed the topic as first and the message as second
7878
* parameter. A third parameter indicates whether the received message has been sent
79-
* because it was retained by the broker.
79+
* because it was retained by the broker. A fourth parameter contains matched topic wildcards.
8080
*
8181
* Example:
8282
* ```php
8383
* $mqtt->subscribe(
8484
* '/foo/bar/+',
85-
* function (string $topic, string $message, bool $retained) use ($logger) {
85+
* function (string $topic, string $message, bool $retained, array $matchedWildcards) use ($logger) {
8686
* $logger->info("Received {retained} message on topic [{topic}]: {message}", [
8787
* 'topic' => $topic,
8888
* 'message' => $message,

src/MqttClient.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ protected function deliverPublishedMessage(string $topic, string $message, int $
941941
}
942942

943943
try {
944-
call_user_func($subscriber->getCallback(), $topic, $message, $retained);
944+
call_user_func($subscriber->getCallback(), $topic, $message, $retained, $subscriber->getMatchedWildcards($topic));
945945
} catch (\Throwable $e) {
946946
$this->logger->error('Subscriber callback threw exception for published message on topic [{topic}].', [
947947
'topic' => $topic,

src/Repositories/MemoryRepository.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public function getSubscriptionsMatchingTopic(string $topicName): array
202202
$result = [];
203203

204204
foreach ($this->subscriptions as $subscription) {
205-
if ($topicName !== null && !$subscription->matchesTopic($topicName)) {
205+
if (!$subscription->matchesTopic($topicName)) {
206206
continue;
207207
}
208208

src/Subscription.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private function regexifyTopicFilter(): void
4949
$topicFilter = substr($topicFilter, $separatorIndex + 1);
5050
}
5151

52-
$this->regexifiedTopicFilter = '/^' . str_replace(['$', '/', '+', '#'], ['\$', '\/', '[^\/]*', '.*'], $topicFilter) . '$/';
52+
$this->regexifiedTopicFilter = '/^' . str_replace(['$', '/', '+', '#'], ['\$', '\/', '([^\/]*)', '(.*)'], $topicFilter) . '$/';
5353
}
5454

5555
/**
@@ -73,6 +73,30 @@ public function matchesTopic(string $topicName): bool
7373
return (bool) preg_match($this->regexifiedTopicFilter, $topicName);
7474
}
7575

76+
/**
77+
* Returns an array which contains all matched wildcards of this subscription, taken from the given topic name.
78+
*
79+
* Example:
80+
* Subscription topic filter: foo/+/bar/+/baz/#
81+
* Result for 'foo/1/bar/2/baz': ['1', '2']
82+
* Result for 'foo/my/bar/subscription/baz/42': ['my', 'subscription', '42']
83+
* Result for 'foo/my/bar/subscription/baz/hello/world/123': ['my', 'subscription', 'hello', 'world', '123']
84+
* Result for invalid topic 'some/topic': []
85+
*
86+
* Note: This method should only be called if {@see matchesTopic} returned true. An empty array will be returned otherwise.
87+
*
88+
* @param string $topicName
89+
* @return array
90+
*/
91+
public function getMatchedWildcards(string $topicName): array
92+
{
93+
if (!preg_match($this->regexifiedTopicFilter, $topicName, $matches)) {
94+
return [];
95+
}
96+
97+
return array_slice($matches, 1);
98+
}
99+
76100
/**
77101
* Returns the callback for this subscription.
78102
*

tests/Feature/PublishSubscribeTest.php

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ class PublishSubscribeTest extends TestCase
2020
public function publishSubscribeData(): array
2121
{
2222
return [
23-
['test/foo/bar/baz', 'test/foo/bar/baz', 'hello world'],
24-
['test/foo/bar/+', 'test/foo/bar/baz', 'hello world'],
25-
['test/foo/+/baz', 'test/foo/bar/baz', 'hello world'],
26-
['test/foo/#', 'test/foo/bar/baz', 'hello world'],
27-
['test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024)], // 2MB message
23+
['test/foo/bar/baz', 'test/foo/bar/baz', 'hello world', []],
24+
['test/foo/bar/+', 'test/foo/bar/baz', 'hello world', ['baz']],
25+
['test/foo/+/baz', 'test/foo/bar/baz', 'hello world', ['bar']],
26+
['test/foo/#', 'test/foo/bar/baz', 'hello world', ['bar/baz']],
27+
['test/foo/+/bar/#', 'test/foo/my/bar/baz', 'hello world', ['my', 'baz']],
28+
['test/foo/+/bar/#', 'test/foo/my/bar/baz/blub', 'hello world', ['my', 'baz/blub']],
29+
['test/foo/bar/baz', 'test/foo/bar/baz', random_bytes(2 * 1024 * 1024), []], // 2MB message
2830
];
2931
}
3032

@@ -34,21 +36,27 @@ public function publishSubscribeData(): array
3436
public function test_publishing_and_subscribing_using_quality_of_service_0_works_as_intended(
3537
string $subscriptionTopicFilter,
3638
string $publishTopic,
37-
string $publishMessage
39+
string $publishMessage,
40+
array $matchedTopicWildcards
3841
): void
3942
{
4043
// We connect and subscribe to a topic using the first client.
4144
$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
4245
$subscriber->connect(null, true);
4346

44-
$subscriber->subscribe($subscriptionTopicFilter, function (string $topic, string $message, bool $retained) use ($subscriber, $publishTopic, $publishMessage) {
45-
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
46-
$this->assertEquals($publishTopic, $topic);
47-
$this->assertEquals($publishMessage, $message);
48-
$this->assertFalse($retained);
47+
$subscriber->subscribe(
48+
$subscriptionTopicFilter,
49+
function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) {
50+
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
51+
$this->assertEquals($publishTopic, $topic);
52+
$this->assertEquals($publishMessage, $message);
53+
$this->assertFalse($retained);
54+
$this->assertEquals($matchedTopicWildcards, $wildcards);
4955

50-
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
51-
}, 0);
56+
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
57+
},
58+
0
59+
);
5260

5361
// We publish a message from a second client on the same topic.
5462
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
@@ -70,21 +78,27 @@ public function test_publishing_and_subscribing_using_quality_of_service_0_works
7078
public function test_publishing_and_subscribing_using_quality_of_service_1_works_as_intended(
7179
string $subscriptionTopicFilter,
7280
string $publishTopic,
73-
string $publishMessage
81+
string $publishMessage,
82+
array $matchedTopicWildcards
7483
): void
7584
{
7685
// We connect and subscribe to a topic using the first client.
7786
$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
7887
$subscriber->connect(null, true);
7988

80-
$subscriber->subscribe($subscriptionTopicFilter, function (string $topic, string $message, bool $retained) use ($subscriber, $publishTopic, $publishMessage) {
81-
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
82-
$this->assertEquals($publishTopic, $topic);
83-
$this->assertEquals($publishMessage, $message);
84-
$this->assertFalse($retained);
89+
$subscriber->subscribe(
90+
$subscriptionTopicFilter,
91+
function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $publishTopic, $publishMessage, $matchedTopicWildcards) {
92+
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
93+
$this->assertEquals($publishTopic, $topic);
94+
$this->assertEquals($publishMessage, $message);
95+
$this->assertFalse($retained);
96+
$this->assertEquals($matchedTopicWildcards, $wildcards);
8597

86-
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
87-
}, 1);
98+
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.
99+
},
100+
1
101+
);
88102

89103
// We publish a message from a second client on the same topic.
90104
$publisher = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'publisher');
@@ -106,18 +120,20 @@ public function test_publishing_and_subscribing_using_quality_of_service_1_works
106120
public function test_publishing_and_subscribing_using_quality_of_service_2_works_as_intended(
107121
string $subscriptionTopicFilter,
108122
string $publishTopic,
109-
string $publishMessage
123+
string $publishMessage,
124+
array $matchedTopicWildcards
110125
): void
111126
{
112127
// We connect and subscribe to a topic using the first client.
113128
$subscriber = new MqttClient($this->mqttBrokerHost, $this->mqttBrokerPort, 'subscriber');
114129
$subscriber->connect(null, true);
115130

116-
$subscription = function (string $topic, string $message, bool $retained) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage) {
131+
$subscription = function (string $topic, string $message, bool $retained, array $wildcards) use ($subscriber, $subscriptionTopicFilter, $publishTopic, $publishMessage, $matchedTopicWildcards) {
117132
// By asserting something here, we will avoid a no-assertions-in-test warning, making the test pass.
118133
$this->assertEquals($publishTopic, $topic);
119134
$this->assertEquals($publishMessage, $message);
120135
$this->assertFalse($retained);
136+
$this->assertEquals($matchedTopicWildcards, $wildcards);
121137

122138
$subscriber->unsubscribe($subscriptionTopicFilter);
123139
$subscriber->interrupt(); // This allows us to exit the test as soon as possible.

0 commit comments

Comments
 (0)