Skip to content

Issue602 #605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 5, 2025
9 changes: 3 additions & 6 deletions lib/src/utility/mqtt_client_byte_buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,14 @@ class MqttByteBuffer {
typed.Uint8Buffer read(int count) {
if ((length < count) || (_position + count) > length) {
throw Exception(
'mqtt_client::ByteBuffer: The buffer did not have '
'mqtt_client::ByteBuffer::read: The buffer does not have '
'enough bytes for the read operation '
'length $length, count $count, position $_position, buffer $buffer',
);
}
final tmp = typed.Uint8Buffer();
tmp.addAll(buffer!.getRange(_position, _position + count));
_position += count;
final tmp2 = typed.Uint8Buffer();
tmp2.addAll(tmp);
return tmp2;
return typed.Uint8Buffer()
..addAll(buffer!.getRange(_position - count, _position));
}

/// Writes a byte to the current position in the buffer
Expand Down
54 changes: 54 additions & 0 deletions test/issues/issue602/main.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import 'dart:async';

import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

void main() async {
final client = MqttServerClient('localhost', 'dart_client1111');
client.port = 1883;
client.logging(on: false);
final connMess = MqttConnectMessage();

client.connectionMessage = connMess;

int i = 0;

client.onConnected = () {
print('Connected to broker');
client.subscribe('station1/all', MqttQos.atMostOnce);
};

client.onDisconnected = () {
print('Disconnected from broker');
};

try {
print('Connecting...');
final connResult = await client.connect();

if (connResult?.state == MqttConnectionState.connected) {
print('Connected successfully');

// Now it's safe to listen
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
i++;
print(i);

// Optional: read message payload
// final payload =
// MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
// print('Message received: $payload');
});
} else {
print('Connection failed - state is ${connResult?.state}');
client.disconnect();
}
} catch (e) {
print('Exception during connection: $e');
client.disconnect();
return;
}

// Keep running
await Future.delayed(Duration(days: 365));
}
62 changes: 62 additions & 0 deletions test/issues/issue602/mqtt_bench2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import time
import paho.mqtt.client as mqtt

BROKER = 'localhost'
PORT = 1883
TOPIC = "benchmark/test"
EXPECTED_MESSAGES = 1000 # Should match publisher MESSAGE_COUNT

received_count = 0
total_bytes = 0
start_time = None
end_time = None

def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker")
client.subscribe(TOPIC)
else:
print(f"Failed to connect, return code {rc}")

def on_message(client, userdata, msg):
global received_count, total_bytes, start_time, end_time

if received_count == 0:
start_time = time.time()

received_count += 1
total_bytes += len(msg.payload)

if received_count == EXPECTED_MESSAGES:
end_time = time.time()
client.loop_stop()

def benchmark_receiver():
global start_time, end_time, received_count, total_bytes

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect(BROKER, PORT, keepalive=60)
client.loop_start()

# Wait max 20 seconds to receive all messages
timeout = time.time() + 20
while received_count < EXPECTED_MESSAGES and time.time() < timeout:
time.sleep(0.01)

if received_count < EXPECTED_MESSAGES:
print(f"Timeout! Received only {received_count} messages.")
else:
duration = end_time - start_time
mb_received = total_bytes / (1024 * 1024)
print(f"All {received_count} messages received.")
print(f"Total receiving time: {duration:.4f} seconds")
print(f"Receiving speed: {received_count / duration:.2f} messages/second")
print(f"Data receiving throughput: {mb_received / duration:.4f} MB/s")

client.disconnect()

if __name__ == "__main__":
benchmark_receiver()
55 changes: 55 additions & 0 deletions test/issues/issue602/mqtt_test2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json
import random
import time
from datetime import datetime, timedelta, timezone
import paho.mqtt.client as mqtt

# MQTT Configuration
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "stations/tanks"

# Data generation
stations = 100
tanks_per_station = 1000
start_time = datetime.now(timezone.utc)

data = []

for station_id in range(1, stations + 1):
for tank_id in range(1, tanks_per_station + 1):
entry = {
"topic": f"station{station_id}/tank{tank_id}",
"value": random.randint(1000, 10000),
"timestamp": (start_time + timedelta(seconds=(station_id * tank_id) % 300)).isoformat() + "Z"
}
data.append(entry)

# Convert to JSON string
json_payload = json.dumps(data)
payload_size = len(json_payload.encode("utf-8"))
print(f"Generated {len(data)} records, payload size: {payload_size / 1024:.2f} KB")

# MQTT Publish
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_start()
i=0
try:
while True:
start_time = time.time()
timestamp = time.time()

# Publish all topics as fast as possible (non-blocking like JS)
client.publish("station1/all", payload=json_payload, qos=0, retain=False)

# Calculate how long the publishing took
publish_time = time.time() - start_time
i=i+1
print(i)
time.sleep(0.1)

except KeyboardInterrupt:
client.loop_stop()
client.disconnect()

119 changes: 119 additions & 0 deletions test/issues/issue602/my_dart_project.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:typed_data/typed_buffers.dart';

const String broker = 'localhost';
const int port = 1883;
const String topic = 'station1/all';
const int expectedMessages = 1000;

int receivedCount = 0;
int totalBytes = 0;
DateTime? startTime;
DateTime? endTime;

Future<void> main() async {
final client = MqttServerClient(broker, '');

client.port = port;
client.logging(on: false);
client.keepAlivePeriod = 60;
client.onDisconnected = onDisconnected;
client.onConnected = onConnected;

final connMessage = MqttConnectMessage()
.withClientIdentifier('dart_mqtt_benchmark_client')
.startClean()
.withWillQos(MqttQos.atMostOnce);

client.connectionMessage = connMessage;

try {
await client.connect();
} catch (e) {
print('Connection failed - $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state != MqttConnectionState.connected) {
print('Connection failed - status is ${client.connectionStatus!.state}');
client.disconnect();
return;
}

print('Connected to MQTT Broker');

final subResult = await client.subscribe(topic, MqttQos.atMostOnce);
if (subResult == null) {
print('Subscription failed');
client.disconnect();
return;
}
print('Subscribed to $topic');

client.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMess = c[0].payload as MqttPublishMessage;
final dynamic payloadData = recMess.payload.message;

List<int> byteList;

if (payloadData is Uint8Buffer) {
byteList = payloadData.toList();
} else if (payloadData is List<int>) {
byteList = payloadData;
} else if (payloadData is ByteBuffer) {
byteList = payloadData.asUint8List();
} else {
throw Exception('Unknown payload type: ${payloadData.runtimeType}');
}

final payload = utf8.decode(byteList);

if (receivedCount == 0) {
startTime = DateTime.now();
}

receivedCount++;
totalBytes += byteList.length;

if (receivedCount >= expectedMessages) {
endTime = DateTime.now();
client.disconnect();
}
});

// Wait for messages or timeout after 30 seconds
final timeout = DateTime.now().add(Duration(seconds: 30));

while (receivedCount < expectedMessages && DateTime.now().isBefore(timeout)) {
await Future.delayed(Duration(milliseconds: 50));
}

if (receivedCount < expectedMessages) {
print('Timeout! Received only $receivedCount messages.');
} else {
final duration = endTime!.difference(startTime!).inMilliseconds / 1000.0;
final mbReceived = totalBytes / (1024 * 1024);

print('All $receivedCount messages received.');
print('Total receiving time: ${duration.toStringAsFixed(4)} seconds');
print(
'Receiving speed: ${(receivedCount / duration).toStringAsFixed(2)} messages/second',
);
print(
'Data throughput: ${(mbReceived / duration).toStringAsFixed(4)} MB/s',
);
}
}

void onDisconnected() {
print('Disconnected from broker');
}

void onConnected() {
print('Connected callback');
}
32 changes: 32 additions & 0 deletions test/issues/issue602/timings.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

1-2025-05-31 11:21:29.136750 -- MqttConnection::onData
1-2025-05-31 11:21:29.137524 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-05-31 11:21:29.142359 -- MqttConnection::onData
1-2025-05-31 11:21:29.144607 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-05-31 11:21:29.147727 -- MqttConnection::onData
1-2025-05-31 11:21:29.362218 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681329
Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14}

1-2025-05-31 11:21:29.362432 -- MqttServerConnection::_onData - message available event fired

---> From top to bottom this is taking 225.468ms to receive this complete message.

---> With Byte buffer remove duplicate buffer fix

1-2025-06-04 11:06:12.850682 -- MqttConnection::onData
1-2025-06-04 11:06:12.850721 -- MqttConnection::onData - incoming adding data to message stream
1-2025-06-04 11:06:12.851453 -- MqttConnection::onData - incoming data added to message stream
1-2025-06-04 11:06:12.851587 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-04 11:06:12.857021 -- MqttConnection::onData
1-2025-06-04 11:06:12.857046 -- MqttConnection::onData - incoming adding data to message stream
1-2025-06-04 11:06:12.857835 -- MqttConnection::onData - incoming data added to message stream
1-2025-06-04 11:06:12.960579 -- MqttConnection::onData - about to shrink message stream
1-2025-06-04 11:06:12.965953 -- MqttConnection::onData - shrunk message stream
1-2025-06-04 11:06:12.965978 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681323
Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14}

1-2025-06-04 11:06:12.966009 -- MqttServerConnection::_onData - message available event fired

---> From top to bottom this is taking 102.744ms to receive this complete message.
4 changes: 2 additions & 2 deletions test/mqtt_client_message_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ void main() {
} on Exception catch (exception) {
expect(
exception.toString(),
'Exception: mqtt_client::ByteBuffer: The buffer did not have enough bytes for the read operation length 3, count 2, position 2, buffer [0, 2, 109]',
'Exception: mqtt_client::ByteBuffer::read: The buffer does not have enough bytes for the read operation length 3, count 2, position 2, buffer [0, 2, 109]',
);
raised = true;
}
Expand Down Expand Up @@ -455,7 +455,7 @@ void main() {
} on Exception catch (exception) {
expect(
exception.toString(),
'Exception: mqtt_client::ByteBuffer: The buffer did not have enough bytes for the read operation length 1, count 2, position 0, buffer [0]',
'Exception: mqtt_client::ByteBuffer::read: The buffer does not have enough bytes for the read operation length 1, count 2, position 0, buffer [0]',
);
raised = true;
}
Expand Down