fork(1) download
  1. import asyncio
  2. import itertools
  3. import threading
  4. import time
  5.  
  6.  
  7. class AsyncLoopThread:
  8. def __init__(self, name='no name'):
  9. self.thread = None
  10. self.name = name
  11. self._loop = None
  12.  
  13. def run(self):
  14. """ spawn a new thread and run an event loop in it """
  15. if self.thread is None:
  16. self.thread = threading.Thread(target=lambda: asyncio.run(self._main_runner()))
  17. self.thread.start()
  18. return self.thread
  19. else:
  20. raise RuntimeError("thread already running")
  21.  
  22. async def _main_runner(self):
  23. """ main asyncio task for event loop """
  24.  
  25. # Сохраняем event loop в объекте, технической необходимости вызывать
  26. # get_running_loop нет, event loop на этот момент уже создан при вызове asyncio.run
  27. self._loop = asyncio.get_running_loop()
  28.  
  29. # Этот объект-событие надо выставить, чтобы тормознуть цикл обработки
  30. # Так делать чуть более кошерно, чем loop.close(), можно корректно завершить разные задачи
  31. self._evt_exit = asyncio.Event()
  32. print(f"async-event-loop with name '{self.name}' started")
  33. # Эту задачу мы будем крутить в фоне, какая-то полезная активность
  34. async def _task():
  35. for counter in itertools.count():
  36. print(f"background async runner '{self.name}', counter: {counter}")
  37. await asyncio.sleep(0.5)
  38.  
  39. bg_task = asyncio.create_task(_task())
  40. await self._evt_exit.wait()
  41. bg_task.cancel()
  42. print(f"task '{self.name}' received stop-event and will be closed")
  43.  
  44.  
  45. def add_task(self, coro):
  46. """ add task to event loop in another thread
  47.  
  48. Метод для добавления задачи в цикл обработки сообщения из другого треда.
  49. Здесь важно, что если мы из другого треда хотим добавить задачу, то мы не
  50. можем просто так вызвать loop.create_task()
  51. эта операция не сохраняет поток, как большинство других. Есть специальный
  52. метод call_soon_threadsave для этих целен
  53. С помощью этого метота можно поместить какой-то синхронный коллбэк, не корутину,
  54. в event loop. А уже в этом коллбэке мы можем добавить задачу-корутину в event loop
  55. """
  56. if self._loop:
  57. self._loop.call_soon_threadsafe(lambda: self._loop.create_task(coro))
  58.  
  59. def main():
  60. async_thread = AsyncLoopThread('async-thread')
  61. t = async_thread.run()
  62. time.sleep(3.3)
  63.  
  64. print("Number of running threads: {}".format(threading.active_count()))
  65. print("main thread id: 0x{:016x}".format(threading.get_ident()))
  66.  
  67. # main() исполняется в основном потоке, тогда как event loop в другомю
  68. # эту корутину мы поместим в другой поток, она что-то напечатает, поспит,
  69. # после чего остановит второй поток
  70. async def other_coro(obj):
  71. print("this coroutine is scheduled from the main thread, sleeping a little bit")
  72. print("event loop thread id: 0x{:016x}".format(threading.get_ident()))
  73. await asyncio.sleep(2)
  74. print("wake up and stop the event loop")
  75. obj._evt_exit.set()
  76.  
  77. async_thread.add_task(other_coro(async_thread))
  78. print("coroutine was schedule in event loop thread")
  79.  
  80. # Ждём, когда завершится тред с event loop
  81. t.join()
  82. print("all work done")
  83.  
  84.  
  85. if __name__ == '__main__':
  86. main()
  87.  
Success #stdin #stdout 0.11s 16056KB
stdin
Standard input is empty
stdout
async-event-loop with name 'async-thread' started
background async runner 'async-thread', counter: 0
background async runner 'async-thread', counter: 1
background async runner 'async-thread', counter: 2
background async runner 'async-thread', counter: 3
background async runner 'async-thread', counter: 4
background async runner 'async-thread', counter: 5
background async runner 'async-thread', counter: 6
Number of running threads: 2
main thread id: 0x000014cf9c119740
coroutine was schedule in event loop thread
this coroutine is scheduled from the main thread, sleeping a little bit
event loop thread id: 0x000014cf9b440700
background async runner 'async-thread', counter: 7
background async runner 'async-thread', counter: 8
background async runner 'async-thread', counter: 9
background async runner 'async-thread', counter: 10
wake up and stop the event loop
task 'async-thread' received stop-event and will be closed
all work done