Hacker News new | ask | show | jobs
by bryogenic 2115 days ago
It is! You just need to use asyncio correctly and put coroutines you want run in parallel in tasks. It's the second example on the asyncio documentation for coroutines and tasks: https://docs.python.org/3/library/asyncio-task.html#asyncio....
2 comments

Yeah, I was thinking along those lines. A more readable version of this library that I sketched out might be something like --

An eagerly executing coroutine to consume the original generator and put it in a queue, and a new simple generator that yields values from the queue till it's told to stop.

I'm not sure if this matches the behavior of the original library for task cancellation though.

    def eager_gen(gen):
        end_marker = object()
    
        queue = asyncio.Queue()
    
        async def consumer():
            try:
                async for val in gen:
                    await queue.put(val)
            except Exception as e:
                await queue.put((end_marker, e))
            else:
                await queue.put((end_marker, None))
    
        async def new_gen():
            while True:
                item = await queue.get()
                if isinstance(item, tuple) and item[0] == end_marker:
                    break
                yield item
    
            if item[1]:
                raise item[1] from None
    
        _task = asyncio.create_task(consumer())
        return new_gen()

[Edit] This would also make a good decorator.
Hmm, so you're saying that this library is not really needed? I'll look into this.
Hi, author here,

While yes, running a coroutine in a new task is in the standard library, that is only a component of what this library does, which is:

- given an async iterable,

- iterating over it in a new task, into a buffer,

- and then returning an iterable that yields values from this buffer as it's filled.

I don't _think_ this logic is in the standard library, at least not at https://docs.python.org/3/library/asyncio-task.html#asyncio.... from what I can see.

Yeah, this actually looks pretty cool, it's a nice way to abstract a common pattern which is otherwise quite tedious, namely, that of spawning a bunch of tasks and have them feed each other work via asyncio queues.

Have you thought about making it work as a decorator?

Indirectly: the initial version was a single function, so could have been used in a decorator easily enough.

The issue is what happens on exceptions to avoid tasks hanging around forever. Exceptions propagate “forward” in the pipeline fairly automatically. However, getting the ones “behind” the exception to notice: that was tricky. The best I came up with was the bit of state wrapped up in buffered_pipeline that notices the exceptions and cancels the tasks.

I'm not saying the OP code isn't useful, but I do think that readers may be unaware of existing documented features and jumping into a library to solve their perceived problems is short changing the existing mechanisms.

I've gotten along ok with create_task and gather. I'll need to spend some time with this implementation to see if it fits any of my existing asyncio coding patterns.