fork(1) download
  1. import asyncio
  2.  
  3. def create_timer(future, ms):
  4. async def inner():
  5. await asyncio.sleep(ms/1000)
  6. await future
  7. return asyncio.create_task(inner)
  8.  
  9.  
  10. def create_interval(future, ms):
  11. async def inner():
  12. i = 0
  13. while True:
  14. print(f'{i} round')
  15. await asyncio.sleep(ms/1000)
  16. await future
  17. i += 1
  18. return asyncio.create_task(inner())
  19.  
  20.  
  21. async def test_func():
  22. print('START TEST FUNC')
  23. await asyncio.sleep(1) # some i/o operation
  24. print('END TEST FUNC')
  25.  
  26.  
  27. async def main():
  28. print('START MAIN')
  29. t1 = create_interval(test_func(), 500)
  30. await asyncio.sleep(3) # long await
  31. t1.cancel()
  32. print('TASK CANCELLED')
  33. await asyncio.sleep(3)
  34. print('END MAIN')
  35.  
  36.  
  37. if __name__ == '__main__':
  38. asyncio.run(main())
  39.  
  40.  
  41.  
  42.  
Success #stdin #stdout 0.13s 16064KB
stdin
Standard input is empty
stdout
START MAIN
0 round
START TEST FUNC
END TEST FUNC
1 round
TASK CANCELLED
END MAIN