How to pipeline data from one stream to another with multiple consumers #924
-
Hello! Thanks for the great library!! I'm trying to pipeline data from one stream to another to implement a simple fan-out / map-reduce pattern. The data flow is quite simple:
This seems to work fine when I have a single consumer, for example: import anyio
async def map_consumer(receive1, send2):
async with send2:
async with receive1:
async for item in receive1:
print("map consumer - ", item)
await send2.send(item)
print("closed receive1")
print("closed send2")
async def reduce_consumer(receive2):
async with receive2:
async for item in receive2:
print("reduce consumer - ", item)
print("closed receive2")
async def producer(send1):
async with send1:
for idx in range(10):
print("producing - ", idx)
await send1.send(idx)
print("closed send1")
async def main():
send1, receive1 = anyio.create_memory_object_stream(max_buffer_size=10)
send2, receive2 = anyio.create_memory_object_stream(max_buffer_size=10)
async with anyio.create_task_group() as tg:
# start the producer
tg.start_soon(producer, send1)
# start the consumer
tg.start_soon(map_consumer, receive1, send2)
# start a second consumer
tg.start_soon(reduce_consumer, receive2)
anyio.run(main) However when I add multiple map consumers, using async def main():
send1, receive1 = anyio.create_memory_object_stream(max_buffer_size=10)
send2, receive2 = anyio.create_memory_object_stream(max_buffer_size=10)
async with anyio.create_task_group() as tg:
# start the producer
tg.start_soon(producer, send1)
# start the consumer
for _ in range(2):
tg.start_soon(map_consumer, receive1.clone(), send2.clone())
# start a second consumer
tg.start_soon(reduce_consumer, receive2) What am I doing wrong? Thanks for the help! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
That's easy. You need to close the original streams, i.e. |
Beta Was this translation helpful? Give feedback.
That's easy. You need to close the original streams, i.e.
receive1
andsend2
.