import asyncio
import itertools
import threading
class AsyncLoopThread:
def __init__(self, name='no name'):
self.thread = None
self.name = name
self._loop = None
def run(self):
""" spawn a new thread and run an event loop in it """
if self.thread is None:
self.thread = threading.Thread(target=lambda: asyncio.run(self._main_runner()))
self.thread.start()
return self.thread
else:
raise RuntimeError
("thread already running")
async def _main_runner(self):
""" main asyncio task for event loop """
# Сохраняем event loop в объекте, технической необходимости вызывать
# get_running_loop нет, event loop на этот момент уже создан при вызове asyncio.run
self._loop = asyncio.get_running_loop()
# Этот объект-событие надо выставить, чтобы тормознуть цикл обработки
# Так делать чуть более кошерно, чем loop.close(), можно корректно завершить разные задачи
self._evt_exit = asyncio.Event()
print(f"async-event-loop with name '{self.name}' started")
# Эту задачу мы будем крутить в фоне, какая-то полезная активность
async def _task():
for counter in itertools.count():
print(f"background async runner '{self.name}', counter: {counter}")
await asyncio.sleep(0.5)
bg_task = asyncio.create_task(_task())
await self._evt_exit.wait()
bg_task.cancel()
print(f"task '{self.name}' received stop-event and will be closed")
def add_task(self, coro):
""" add task to event loop in another thread
Метод для добавления задачи в цикл обработки сообщения из другого треда.
Здесь важно, что если мы из другого треда хотим добавить задачу, то мы не
можем просто так вызвать loop.create_task()
эта операция не сохраняет поток, как большинство других. Есть специальный
метод call_soon_threadsave для этих целен
С помощью этого метота можно поместить какой-то синхронный коллбэк, не корутину,
в event loop. А уже в этом коллбэке мы можем добавить задачу-корутину в event loop
"""
if self._loop:
self._loop.call_soon_threadsafe(lambda: self._loop.create_task(coro))
def main():
async_thread = AsyncLoopThread('async-thread')
t = async_thread.run()
print("Number of running threads: {}".format(threading.active_count()))
print("main thread id: 0x{:016x}".format(threading.get_ident()))
# main() исполняется в основном потоке, тогда как event loop в другомю
# эту корутину мы поместим в другой поток, она что-то напечатает, поспит,
# после чего остановит второй поток
async def other_coro(obj):
print("this coroutine is scheduled from the main thread, sleeping a little bit")
print("event loop thread id: 0x{:016x}".format(threading.get_ident()))
await asyncio.sleep(2)
print("wake up and stop the event loop")
obj._evt_exit.set()
async_thread.add_task(other_coro(async_thread))
print("coroutine was schedule in event loop thread")
# Ждём, когда завершится тред с event loop
t.join()
print("all work done")
if __name__ == '__main__':
main()