# Online Python - IDE, Editor, Compiler, Interpreter
import asyncio
import sys
import typing
from collections .abc import Awaitable, Iterable
ConsumerId = int
tasks: dict [ ConsumerId, Awaitable] = { }
sem = asyncio.Semaphore ( 4 )
async def consumer( data, consumer_id: ConsumerId) :
try :
print ( f'Starting consumer {consumer_id}' )
print ( f'Working on "{data}" ...' )
await asyncio.sleep ( 2 )
print ( f'Exiting consumer {consumer_id}' )
finally :
sem.release ( )
# If we exited cleanly, just remove ourselves from `tasks` dict because nobody needs to await us.
if not sys .exc_info ( ) [ 0 ] :
del tasks[ consumer_id]
async def main( producer: Iterable) :
for consumer_id, data in enumerate ( producer) :
await sem.acquire ( )
consumer_task = asyncio.create_task ( consumer( data, consumer_id = consumer_id) )
tasks[ consumer_id] = consumer_task
# At this point just await all the remaining tasks which are either still running or had exited with exception
for coro in asyncio.as_completed ( tasks.values ( ) ) :
try :
await coro
except Exception as exc:
print ( f'Do something about {exc}' )
producer = range ( 100 )
asyncio.run ( main( producer = producer) )
CiMgT25saW5lIFB5dGhvbiAtIElERSwgRWRpdG9yLCBDb21waWxlciwgSW50ZXJwcmV0ZXIKCmltcG9ydCBhc3luY2lvCmltcG9ydCBzeXMKaW1wb3J0IHR5cGluZwpmcm9tIGNvbGxlY3Rpb25zLmFiYyBpbXBvcnQgQXdhaXRhYmxlLCBJdGVyYWJsZQoKCkNvbnN1bWVySWQgPSBpbnQKCnRhc2tzOiBkaWN0W0NvbnN1bWVySWQsIEF3YWl0YWJsZV0gPSB7fQpzZW0gPSBhc3luY2lvLlNlbWFwaG9yZSg0KQoKYXN5bmMgZGVmIGNvbnN1bWVyKGRhdGEsIGNvbnN1bWVyX2lkOiBDb25zdW1lcklkKToKICAgIHRyeToKICAgICAgICBwcmludChmJ1N0YXJ0aW5nIGNvbnN1bWVyIHtjb25zdW1lcl9pZH0nKQogICAgICAgIHByaW50KGYnV29ya2luZyBvbiAie2RhdGF9IiAuLi4nKQogICAgICAgIGF3YWl0IGFzeW5jaW8uc2xlZXAoMikKICAgICAgICBwcmludChmJ0V4aXRpbmcgY29uc3VtZXIge2NvbnN1bWVyX2lkfScpCiAgICBmaW5hbGx5OgogICAgICAgIHNlbS5yZWxlYXNlKCkKICAgICAgICAjIElmIHdlIGV4aXRlZCBjbGVhbmx5LCBqdXN0IHJlbW92ZSBvdXJzZWx2ZXMgZnJvbSBgdGFza3NgIGRpY3QgYmVjYXVzZSBub2JvZHkgbmVlZHMgdG8gYXdhaXQgdXMuCiAgICAgICAgaWYgbm90IHN5cy5leGNfaW5mbygpWzBdOgogICAgICAgICAgICBkZWwgdGFza3NbY29uc3VtZXJfaWRdCgphc3luYyBkZWYgbWFpbihwcm9kdWNlcjogSXRlcmFibGUpOgogICAgZm9yIGNvbnN1bWVyX2lkLCBkYXRhIGluIGVudW1lcmF0ZShwcm9kdWNlcik6CiAgICAgICAgYXdhaXQgc2VtLmFjcXVpcmUoKQogICAgICAgIGNvbnN1bWVyX3Rhc2sgPSBhc3luY2lvLmNyZWF0ZV90YXNrKGNvbnN1bWVyKGRhdGEsIGNvbnN1bWVyX2lkID0gY29uc3VtZXJfaWQpKQogICAgICAgIHRhc2tzW2NvbnN1bWVyX2lkXSA9IGNvbnN1bWVyX3Rhc2sKCiAgICAjIEF0IHRoaXMgcG9pbnQganVzdCBhd2FpdCBhbGwgdGhlIHJlbWFpbmluZyB0YXNrcyB3aGljaCBhcmUgZWl0aGVyIHN0aWxsIHJ1bm5pbmcgb3IgaGFkIGV4aXRlZCB3aXRoIGV4Y2VwdGlvbgogICAgZm9yIGNvcm8gaW4gYXN5bmNpby5hc19jb21wbGV0ZWQodGFza3MudmFsdWVzKCkpOgogICAgICAgIHRyeToKICAgICAgICAgICAgYXdhaXQgY29ybwogICAgICAgIGV4Y2VwdCBFeGNlcHRpb24gYXMgZXhjOgogICAgICAgICAgICBwcmludChmJ0RvIHNvbWV0aGluZyBhYm91dCB7ZXhjfScpCgpwcm9kdWNlciA9IHJhbmdlKDEwMCkKYXN5bmNpby5ydW4obWFpbihwcm9kdWNlciA9IHByb2R1Y2VyKSk=
stdout
Starting consumer 0
Working on "0" ...
Starting consumer 1
Working on "1" ...
Starting consumer 2
Working on "2" ...
Starting consumer 3
Working on "3" ...
stderr
Traceback (most recent call last):
File "./prog.py", line 41, in <module>
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "./prog.py", line 29, in main
File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
await fut
RuntimeError: Task <Task pending name='Task-1' coro=<main() running at ./prog.py:29> cb=[_run_until_complete_cb() at /usr/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop