fork download
  1. import threading
  2. import asyncio
  3. from concurrent.futures import Future
  4. import functools
  5.  
  6. class EventLoopOwner(threading.Thread):
  7. class __Properties:
  8. def __init__(self, loop, thread, evt_start, evt_stop):
  9. self.loop = loop
  10. self.thread = thread
  11. self.evt_start = evt_start
  12. self.evt_stop = evt_stop
  13.  
  14. def __init__(self):
  15. threading.Thread.__init__(self)
  16. self.__elo = self.__Properties(None, None, threading.Event(), threading.Event())
  17.  
  18. def run(self):
  19. self.__elo.loop = asyncio.new_event_loop()
  20. asyncio.set_event_loop(self.__elo.loop)
  21. self.__elo.thread = threading.current_thread()
  22. self.__elo.loop.call_soon_threadsafe(self.__elo.evt_start.set)
  23. self.__elo.loop.run_forever()
  24.  
  25. def __stop(self):
  26. self.__elo.loop.stop()
  27. self.__elo.evt_stop.set()
  28.  
  29. def stop(self):
  30. # self.__elo.loop.call_soon_threadsafe(self.__elo.loop.stop)
  31. assert isinstance(self.__elo.loop, asyncio.AbstractEventLoop)
  32. self.__elo.loop.call_soon_threadsafe(self.__stop)
  33. self.__elo.evt_stop.wait(2.0)
  34.  
  35. def _add_task(self, future, coro):
  36. task = self.__elo.loop.create_task(coro)
  37. future.set_result(task)
  38.  
  39. def add_task(self, coro):
  40. self.__elo.evt_start.wait()
  41. future = Future()
  42. p = functools.partial(self._add_task, future, coro)
  43. self.__elo.loop.call_soon_threadsafe(p)
  44. return future.result() # block until result is available
  45.  
  46. def cancel(self, task):
  47. self.__elo.loop.call_soon_threadsafe(task.cancel)
  48.  
  49. @asyncio.coroutine
  50. def foo(i):
  51. return 2 * i
  52.  
  53. @asyncio.coroutine
  54. def main():
  55. ELO = EventLoopOwner
  56. elo = ELO()
  57. elo.start()
  58.  
  59. # yield from asyncio.sleep(0.1)
  60.  
  61. task = elo.add_task(foo(5))
  62. x = yield from task
  63.  
  64. print(x)
  65. elo.stop(); print("Stopped")
  66. elo.join(); print("Joined")
  67.  
  68. if __name__ == "__main__":
  69. for _ in range(10):
  70. loop = asyncio.new_event_loop()
  71. asyncio.set_event_loop(loop)
  72. assert isinstance(loop, asyncio.AbstractEventLoop)
  73. try:
  74. loop.run_until_complete(main())
  75. finally:
  76. loop.close()
Success #stdin #stdout 0.06s 19024KB
stdin
Standard input is empty
stdout
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined
10
Stopped
Joined