fork download
  1. from typing import Callable, Awaitable, Any
  2. import asyncio
  3. import logging
  4.  
  5.  
  6. def create_strict_periodic_task(func: Callable, period: int, revokable=False) -> asyncio.Task: # todo: check revoke
  7. async def inner(f: Callable) -> None:
  8. while True:
  9. try:
  10. if asyncio.iscoroutinefunction(f):
  11. await f()
  12. else:
  13. f()
  14. await asyncio.sleep(period)
  15. except Exception as exc:
  16. if revokable:
  17. logging.exception(f'Error while performing periodic task with {f.__name__} - {exc}')
  18. await asyncio.sleep(5)
  19. else:
  20. raise exc
  21.  
  22. return asyncio.create_task(inner(func))
  23.  
  24.  
  25. def create_adaptive_periodic_task(func: Callable[..., Awaitable[Any]], period: int, revokable=False) -> asyncio.Task:
  26. async def inner(f: Callable) -> None:
  27. while True:
  28. try:
  29. asyncio.create_task(f())
  30. await asyncio.sleep(period)
  31. except Exception as exc:
  32. if revokable:
  33. logging.exception(f'Error while performing periodic task with {f.__name__} - {exc}')
  34. await asyncio.sleep(5)
  35. else:
  36. raise exc
  37.  
  38. return asyncio.create_task(inner(func))
Success #stdin #stdout 0.11s 16200KB
stdin
Standard input is empty
stdout
Standard output is empty