Yeah, I was thinking along those lines. A more readable version of this library that I sketched out might be something like --
An eagerly executing coroutine to consume the original generator and put it in a queue, and a new simple generator that yields values from the queue till it's told to stop.
I'm not sure if this matches the behavior of the original library for task cancellation though.
def eager_gen(gen):
end_marker = object()
queue = asyncio.Queue()
async def consumer():
try:
async for val in gen:
await queue.put(val)
except Exception as e:
await queue.put((end_marker, e))
else:
await queue.put((end_marker, None))
async def new_gen():
while True:
item = await queue.get()
if isinstance(item, tuple) and item[0] == end_marker:
break
yield item
if item[1]:
raise item[1] from None
_task = asyncio.create_task(consumer())
return new_gen()
Yeah, this actually looks pretty cool, it's a nice way to abstract a common pattern which is otherwise quite tedious, namely, that of spawning a bunch of tasks and have them feed each other work via asyncio queues.
Have you thought about making it work as a decorator?
Indirectly: the initial version was a single function, so could have been used in a decorator easily enough.
The issue is what happens on exceptions to avoid tasks hanging around forever. Exceptions propagate “forward” in the pipeline fairly automatically. However, getting the ones “behind” the exception to notice: that was tricky. The best I came up with was the bit of state wrapped up in buffered_pipeline that notices the exceptions and cancels the tasks.
I'm not saying the OP code isn't useful, but I do think that readers may be unaware of existing documented features and jumping into a library to solve their perceived problems is short changing the existing mechanisms.
I've gotten along ok with create_task and gather. I'll need to spend some time with this implementation to see if it fits any of my existing asyncio coding patterns.
An eagerly executing coroutine to consume the original generator and put it in a queue, and a new simple generator that yields values from the queue till it's told to stop.
I'm not sure if this matches the behavior of the original library for task cancellation though.
[Edit] This would also make a good decorator.