File tree

5 files changed

+274
-7
lines changed

5 files changed

+274
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ MicroPython's `asyncio` when used in a microcontroller context.
2626
6.1 [Encoder class](./DRIVERS.md#61-encoder-class)
2727
7. [Ringbuf Queue](./DRIVERS.md#7-ringbuf-queue) A MicroPython optimised queue primitive.
2828
8. [Delay_ms class](./DRIVERS.md#8-delay_ms-class) A flexible retriggerable delay with callback or Event interface.
29-
9. [Additional functions](./DRIVERS.md#9-additional-functions)
30-
9.1 [launch](./DRIVERS.md#91-launch) Run a coro or callback interchangeably.
31-
9.2 [set_global_exception](./DRIVERS.md#92-set_global_exception) Simplify debugging with a global exception handler.
29+
9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between
30+
tasks.
31+
9.1 [Further examples](./DRIVERS.md#91-further-examples)
32+
9.2 [User agents](./DRIVERS.md#92-user-agents)
33+
10. [Additional functions](./DRIVERS.md#10-additional-functions)
34+
10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably.
35+
10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler.
3236

33-
###### [Tutorial](./TUTORIAL.md#contents)
37+
###### [asyncio Tutorial](./TUTORIAL.md#contents)
3438

3539
# 1. Introduction
3640

@@ -1126,9 +1130,116 @@ finally:
11261130
```
11271131
###### [Contents](./DRIVERS.md#0-contents)
11281132

1129-
# 9. Additional functions
1133+
# 9. Message Broker
1134+
1135+
This is under development: please check for updates.
1136+
1137+
The `Broker` class provides a flexible means of messaging between running tasks.
1138+
It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task
1139+
publishes to a topic. Any tasks subscribed to that topic will receive the
1140+
message. This enables one to one, one to many or many to many messaging.
1141+
1142+
A task subscribes to a topic with an `agent`. This is stored by the broker. When
1143+
the broker publishes a message, the `agent` of each task subscribed to its topic
1144+
will be triggered. In the simplest case the `agent` is a `Queue` instance: the
1145+
broker puts the topic and message onto the subscriber's queue for retrieval.
1146+
1147+
More advanced agents can perform actions in response to a message, such as
1148+
calling a function or launching a `task`.
1149+
1150+
Broker methods. All are synchronous, constructor has no args:
1151+
* `subscribe(topic, agent)` Passed `agent` will be triggered by messages with a
1152+
matching `topic`.
1153+
* `unsubscribe(topic, agent)` The `agent` will stop being triggered.
1154+
* `publish(topic, message)` All `agent` instances subscribed to `topic` will be
1155+
triggered, receiving `topic` and `message` args. Returns `True` unless a `Queue`
1156+
agent has become full, in which case data for that queue has been lost.
1157+
1158+
The `topic` arg is typically a string but may be any hashable object. A
1159+
`message` is an arbitrary Python object. An `agent` may be any of the following:
1160+
* `Queue` When a message is received receives 2-tuple `(topic, message)`.
1161+
* `function` Called when a message is received. Gets 2 args, topic and message.
1162+
* `bound method` Called when a message is received. Gets 2 args, topic and
1163+
message.
1164+
* `coroutine` Task created when a message is received with 2 args, topic and
1165+
message.
1166+
* `bound coroutine` Task created when a message is received with 2 args, topic
1167+
and message.
1168+
* Instance of a user class. See user agents below.
1169+
* `Event` Set when a message is received.
1170+
1171+
Note that synchronous `agent` instances must run to completion quickly otherwise
1172+
the `publish` method will be slowed.
1173+
1174+
The following is a simple example:
1175+
```py
1176+
import asyncio
1177+
from primitives import Broker, Queue
1178+
1179+
broker = Broker()
1180+
queue = Queue()
1181+
async def sender(t):
1182+
for x in range(t):
1183+
await asyncio.sleep(1)
1184+
broker.publish("foo_topic", f"test {x}")
1185+
1186+
async def main():
1187+
broker.subscribe("foo_topic", queue)
1188+
n = 10
1189+
asyncio.create_task(sender(n))
1190+
print("Letting queue part-fill")
1191+
await asyncio.sleep(5)
1192+
for _ in range(n):
1193+
topic, message = await queue.get()
1194+
print(topic, message)
1195+
1196+
asyncio.run(main())
1197+
```
1198+
## 9.1 Further examples
1199+
1200+
An interesting application is to extend MQTT into the Python code
1201+
(see [mqtt_as](https://.com/peterhinch/micropython-mqtt/tree/master)).
1202+
This is as simple as:
1203+
```py
1204+
async def messages(client):
1205+
async for topic, msg, retained in client.queue:
1206+
broker.publish(topic.decode(), msg.decode())
1207+
```
1208+
Assuming the MQTT client is subscribed to multiple topics, message strings are
1209+
directed to individual tasks each supporting one topic.
1210+
1211+
## 9.2 User agents
1212+
1213+
An `agent` can be an instance of a user class. The class must be a subclass of
1214+
`Agent`, and it must support a synchronous `.put` method. The latter takes two
1215+
args, being `topic` and `message`. It should run to completion quickly.
1216+
1217+
```py
1218+
import asyncio
1219+
from primitives import Broker, Agent
1220+
1221+
broker = Broker()
1222+
class MyAgent(Agent):
1223+
def put(sef, topic, message):
1224+
print(f"User agent. Topic: {topic} Message: {message}")
1225+
1226+
async def sender(t):
1227+
for x in range(t):
1228+
await asyncio.sleep(1)
1229+
broker.publish("foo_topic", f"test {x}")
1230+
1231+
async def main():
1232+
broker.subscribe("foo_topic", MyAgent())
1233+
await sender(10)
1234+
1235+
asyncio.run(main())
1236+
```
1237+
1238+
###### [Contents](./DRIVERS.md#0-contents)
1239+
1240+
# 10. Additional functions
11301241

1131-
## 9.1 Launch
1242+
## 10.1 Launch
11321243

11331244
Import as follows:
11341245
```python
@@ -1140,7 +1251,7 @@ runs it and returns the callback's return value. If a coro is passed, it is
11401251
converted to a `task` and run asynchronously. The return value is the `task`
11411252
instance. A usage example is in `primitives/switch.py`.
11421253

1143-
## 9.2 set_global_exception
1254+
## 10.2 set_global_exception
11441255

11451256
Import as follows:
11461257
```python
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ def _handle_exception(loop, context):
5353
"RingbufQueue": "ringbuf_queue",
5454
"Keyboard": "sw_array",
5555
"SwArray": "sw_array",
56+
"Broker": "broker",
57+
"Agent": "broker",
5658
}
5759

5860
# Copied from uasyncio.__init__.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# broker.py A message broker for MicroPython
2+
3+
# Copyright (c) 2024 Peter Hinch
4+
# Released under the MIT License (MIT) - see LICENSE file
5+
6+
# Inspired by the following
7+
# https://www.joeltok.com/posts/2021-03-building-an-event-bus-in-python/
8+
9+
import asyncio
10+
from primitives import Queue, type_coro
11+
12+
13+
class Agent:
14+
pass
15+
16+
17+
class Broker(dict):
18+
def subscribe(self, topic, agent):
19+
if not self.get(topic, False):
20+
self[topic] = {agent}
21+
else:
22+
self[topic].add(agent)
23+
24+
def unsubscribe(self, topic, agent):
25+
try:
26+
self[topic].remove(agent)
27+
if len(self[topic]) == 0:
28+
del self[topic]
29+
except KeyError:
30+
pass # Topic already removed
31+
32+
def publish(self, topic, message):
33+
agents = self.get(topic, [])
34+
result = True
35+
for agent in agents:
36+
if isinstance(agent, asyncio.Event):
37+
agent.set()
38+
continue
39+
if isinstance(agent, Agent): # User class
40+
agent.put(topic, message) # Must support .put
41+
continue
42+
if isinstance(agent, Queue):
43+
if agent.full():
44+
result = False
45+
else:
46+
agent.put_nowait((topic, message))
47+
continue
48+
# agent is function, method, coroutine or bound coroutine
49+
res = agent(topic, message)
50+
if isinstance(res, type_coro):
51+
asyncio.create_task(res)
52+
return result
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
["primitives/__init__.py", ":peterhinch/micropython-async/v3/primitives/__init__.py"],
44
["primitives/aadc.py", ":peterhinch/micropython-async/v3/primitives/aadc.py"],
55
["primitives/barrier.py", ":peterhinch/micropython-async/v3/primitives/barrier.py"],
6+
["primitives/broker.py", ":peterhinch/micropython-async/v3/primitives/broker.py"],
67
["primitives/condition.py", ":peterhinch/micropython-async/v3/primitives/condition.py"],
78
["primitives/delay_ms.py", ":peterhinch/micropython-async/v3/primitives/delay_ms.py"],
89
["primitives/encoder.py", ":peterhinch/micropython-async/v3/primitives/encoder.py"],
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# broker_test.py Test various types of subscriber
2+
3+
# import primitives.tests.broker_test
4+
5+
import asyncio
6+
from primitives import Broker, Queue
7+
8+
broker = Broker()
9+
10+
# Periodically publish messages to two topics
11+
async def test(t):
12+
for x in range(t):
13+
await asyncio.sleep(1)
14+
broker.publish("foo_topic", f"dogs {x}")
15+
broker.publish("bar_topic", f"rats {x}")
16+
17+
18+
# Suscribe via coroutine
19+
async def subs(topic, message):
20+
await asyncio.sleep_ms(100)
21+
print("coroutine", topic, message)
22+
23+
24+
# Subscribe via function
25+
def func(topic, message):
26+
print("function", topic, message)
27+
28+
29+
# Subscribe via Event
30+
31+
event = asyncio.Event()
32+
33+
34+
async def event_test():
35+
while True:
36+
await event.wait()
37+
event.clear()
38+
print("Event triggered")
39+
40+
41+
class TestClass:
42+
async def fetch_data(self, topic, message):
43+
await asyncio.sleep_ms(100)
44+
print("bound coro", topic, message)
45+
46+
def get_data(self, topic, message):
47+
print("bound method", topic, message)
48+
49+
50+
async def print_queue(q):
51+
while True:
52+
topic, message = await q.get()
53+
print(topic, message)
54+
55+
56+
async def main():
57+
tc = TestClass()
58+
q = Queue(10)
59+
print("Subscribing Event, coroutine, Queue and bound coroutine.")
60+
broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine
61+
broker.subscribe("bar_topic", subs) # Coroutine
62+
broker.subscribe("bar_topic", event)
63+
broker.subscribe("foo_topic", q)
64+
65+
asyncio.create_task(test(30)) # Publish to topics for 30s
66+
asyncio.create_task(event_test())
67+
await asyncio.sleep(5)
68+
print()
69+
print("Unsubscribing coroutine")
70+
broker.unsubscribe("bar_topic", subs)
71+
await asyncio.sleep(5)
72+
print()
73+
print("Unsubscribing Event")
74+
broker.unsubscribe("bar_topic", event)
75+
print()
76+
print("Subscribing function")
77+
broker.subscribe("bar_topic", func)
78+
await asyncio.sleep(5)
79+
print()
80+
print("Unsubscribing function")
81+
broker.unsubscribe("bar_topic", func)
82+
print()
83+
print("Unsubscribing bound coroutine")
84+
broker.unsubscribe("foo_topic", tc.fetch_data) # Async method
85+
print()
86+
print("Subscribing method")
87+
broker.subscribe("foo_topic", tc.get_data) # Sync method
88+
await asyncio.sleep(5)
89+
print()
90+
print("Unsubscribing method")
91+
broker.unsubscribe("foo_topic", tc.get_data) # Async method
92+
print("Pause 5s")
93+
await asyncio.sleep(5)
94+
print("Retrieving foo_topic messages from queue")
95+
try:
96+
await asyncio.wait_for(print_queue(q), 5)
97+
except asyncio.TimeoutError:
98+
print("Done")
99+
100+
101+
asyncio.run(main())

0 commit comments

Comments
 (0)