import threading
import asyncio
from concurrent.futures import Future
import functools
class EventLoopOwner(threading.Thread):
class __Properties:
def __init__(self, loop, thread, evt_start, evt_stop):
self.loop = loop
self.thread = thread
self.evt_start = evt_start
self.evt_stop = evt_stop
def __init__(self):
threading.Thread.__init__(self)
self.__elo = self.__Properties(None, None, threading.Event(), threading.Event())
def run(self):
self.__elo.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.__elo.loop)
self.__elo.thread = threading.current_thread()
self.__elo.loop.call_soon_threadsafe(self.__elo.evt_start.set)
self.__elo.loop.run_forever()
def __stop(self):
self.__elo.loop.stop()
self.__elo.evt_stop.set()
def stop(self):
# self.__elo.loop.call_soon_threadsafe(self.__elo.loop.stop)
assert isinstance(self.__elo.loop, asyncio.AbstractEventLoop)
self.__elo.loop.call_soon_threadsafe(self.__stop)
self.__elo.evt_stop.wait(2.0)
def _add_task(self, future, coro):
task = self.__elo.loop.create_task(coro)
future.set_result(task)
def add_task(self, coro):
self.__elo.evt_start.wait()
future = Future()
p = functools.partial(self._add_task, future, coro)
self.__elo.loop.call_soon_threadsafe(p)
return future.result() # block until result is available
def cancel(self, task):
self.__elo.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def foo(i):
return 2 * i
@asyncio.coroutine
def main():
ELO = EventLoopOwner
elo = ELO()
elo.start()
# yield from asyncio.sleep(0.1)
task = elo.add_task(foo(5))
x = yield from task
print(x)
elo.stop(); print("Stopped")
elo.join(); print("Joined")
if __name__ == "__main__":
for _ in range(10):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
assert isinstance(loop, asyncio.AbstractEventLoop)
try:
loop.run_until_complete(main())
finally:
loop.close()