fork(2) download
  1.  
  2. # Online Python - IDE, Editor, Compiler, Interpreter
  3.  
  4. import asyncio
  5. import sys
  6. import typing
  7. from collections.abc import Awaitable, Iterable
  8.  
  9.  
  10. ConsumerId = int
  11.  
  12. tasks: dict[ConsumerId, Awaitable] = {}
  13. sem = asyncio.Semaphore(4)
  14.  
  15. async def consumer(data, consumer_id: ConsumerId):
  16. try:
  17. print(f'Starting consumer {consumer_id}')
  18. print(f'Working on "{data}" ...')
  19. await asyncio.sleep(2)
  20. print(f'Exiting consumer {consumer_id}')
  21. finally:
  22. sem.release()
  23. # If we exited cleanly, just remove ourselves from `tasks` dict because nobody needs to await us.
  24. if not sys.exc_info()[0]:
  25. del tasks[consumer_id]
  26.  
  27. async def main(producer: Iterable):
  28. for consumer_id, data in enumerate(producer):
  29. await sem.acquire()
  30. consumer_task = asyncio.create_task(consumer(data, consumer_id = consumer_id))
  31. tasks[consumer_id] = consumer_task
  32.  
  33. # At this point just await all the remaining tasks which are either still running or had exited with exception
  34. for coro in asyncio.as_completed(tasks.values()):
  35. try:
  36. await coro
  37. except Exception as exc:
  38. print(f'Do something about {exc}')
  39.  
  40. producer = range(100)
  41. asyncio.run(main(producer = producer))
Runtime error #stdin #stdout #stderr 0.18s 28444KB
stdin
Standard input is empty
stdout
Starting consumer 0
Working on "0" ...
Starting consumer 1
Working on "1" ...
Starting consumer 2
Working on "2" ...
Starting consumer 3
Working on "3" ...
stderr
Traceback (most recent call last):
  File "./prog.py", line 41, in <module>
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "./prog.py", line 29, in main
  File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-1' coro=<main() running at ./prog.py:29> cb=[_run_until_complete_cb() at /usr/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop