Skip to content

Commit 8945ee8

Browse files
authored
Merge pull request #444 from bkanuka/master
add example of using trio as external loop
2 parents 42f0b13 + c916ba1 commit 8945ee8

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

examples/loop_trio.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#!/usr/bin/env python3
2+
3+
import socket
4+
import uuid
5+
import paho.mqtt.client as mqtt
6+
import trio
7+
8+
client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
9+
topic = client_id
10+
print("Using client_id / topic: " + client_id)
11+
12+
13+
class TrioAsyncHelper:
14+
def __init__(self, client):
15+
self.client = client
16+
self.sock = None
17+
self._event_large_write = trio.Event()
18+
19+
self.client.on_socket_open = self.on_socket_open
20+
self.client.on_socket_register_write = self.on_socket_register_write
21+
self.client.on_socket_unregister_write = self.on_socket_unregister_write
22+
23+
async def read_loop(self):
24+
while True:
25+
await trio.hazmat.wait_readable(self.sock)
26+
self.client.loop_read()
27+
28+
async def write_loop(self):
29+
while True:
30+
await self._event_large_write.wait()
31+
await trio.hazmat.wait_writable(self.sock)
32+
self.client.loop_write()
33+
34+
async def misc_loop(self):
35+
print("misc_loop started")
36+
while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
37+
await trio.sleep(1)
38+
print("misc_loop finished")
39+
40+
def on_socket_open(self, client, userdata, sock):
41+
print("Socket opened")
42+
self.sock = sock
43+
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
44+
45+
def on_socket_register_write(self, client, userdata, sock):
46+
print('large write request')
47+
self._event_large_write.set()
48+
49+
def on_socket_unregister_write(self, client, userdata, sock):
50+
print(f"finished large write")
51+
self._event_large_write = trio.Event()
52+
53+
54+
class TrioAsyncMqttExample:
55+
def on_connect(self, client, userdata, flags, rc):
56+
print("Subscribing")
57+
client.subscribe(topic)
58+
59+
def on_message(self, client, userdata, msg):
60+
print("Got response with {} bytes".format(len(msg.payload)))
61+
62+
def on_disconnect(self, client, userdata, rc):
63+
print(f'Disconnect result {rc}')
64+
65+
async def test_write(self, cancel_scope: trio.CancelScope):
66+
for c in range(3):
67+
await trio.sleep(5)
68+
print(f"Publishing")
69+
self.client.publish(topic, b'Hello' * 40000, qos=1)
70+
cancel_scope.cancel()
71+
72+
async def main(self):
73+
self.client = mqtt.Client(client_id=client_id)
74+
self.client.on_connect = self.on_connect
75+
self.client.on_message = self.on_message
76+
self.client.on_disconnect = self.on_disconnect
77+
78+
trio_helper = TrioAsyncHelper(self.client)
79+
80+
self.client.connect('mqtt.eclipse.org', 1883, 60)
81+
82+
async with trio.open_nursery() as nursery:
83+
nursery.start_soon(trio_helper.read_loop)
84+
nursery.start_soon(trio_helper.write_loop)
85+
nursery.start_soon(trio_helper.misc_loop)
86+
nursery.start_soon(self.test_write, nursery.cancel_scope)
87+
88+
self.client.disconnect()
89+
print("Disconnected")
90+
91+
92+
print("Starting")
93+
trio.run(TrioAsyncMqttExample().main)
94+
print("Finished")

0 commit comments

Comments
 (0)