import asyncio
class AsyncDescriptor:
@staticmethod
async def fetch(url):
print('fetching', url)
# For demo just return:
return 'contents of page at the specified URL'
def __init__(self, url):
self._url = url
def __get__(self, obj, cls=None):
# Generate some async future here
return self.fetch(self._url)
class Test:
value = AsyncDescriptor('https://e...content-available-to-author-only...e.com')
async def main():
test = Test()
coroutine = test.value # a coroutine that can be schduled as a task
# or simply awaited
loop = asyncio.get_running_loop()
task = loop.create_task(coroutine)
... # schedule other tasks perhaps?
result = await task
print(result)
if __name__ == '__main__':
asyncio.run(main())
aW1wb3J0IGFzeW5jaW8KCmNsYXNzIEFzeW5jRGVzY3JpcHRvcjoKICAgIEBzdGF0aWNtZXRob2QKICAgIGFzeW5jIGRlZiBmZXRjaCh1cmwpOgogICAgICAgIHByaW50KCdmZXRjaGluZycsIHVybCkKICAgICAgICAjIEZvciBkZW1vIGp1c3QgcmV0dXJuOgogICAgICAgIHJldHVybiAnY29udGVudHMgb2YgcGFnZSBhdCB0aGUgc3BlY2lmaWVkIFVSTCcKCiAgICBkZWYgX19pbml0X18oc2VsZiwgdXJsKToKICAgICAgICBzZWxmLl91cmwgPSB1cmwKCiAgICBkZWYgX19nZXRfXyhzZWxmLCBvYmosIGNscz1Ob25lKToKICAgICAgICAjIEdlbmVyYXRlIHNvbWUgYXN5bmMgZnV0dXJlIGhlcmUKICAgICAgICByZXR1cm4gc2VsZi5mZXRjaChzZWxmLl91cmwpCgpjbGFzcyBUZXN0OgogICAgdmFsdWUgPSBBc3luY0Rlc2NyaXB0b3IoJ2h0dHBzOi8vZS4uLmNvbnRlbnQtYXZhaWxhYmxlLXRvLWF1dGhvci1vbmx5Li4uZS5jb20nKQoKCmFzeW5jIGRlZiBtYWluKCk6CiAgICB0ZXN0ID0gVGVzdCgpCiAgICBjb3JvdXRpbmUgPSB0ZXN0LnZhbHVlICAjIGEgY29yb3V0aW5lIHRoYXQgY2FuIGJlIHNjaGR1bGVkIGFzIGEgdGFzawogICAgIyBvciBzaW1wbHkgYXdhaXRlZAogICAgbG9vcCA9IGFzeW5jaW8uZ2V0X3J1bm5pbmdfbG9vcCgpCiAgICB0YXNrID0gbG9vcC5jcmVhdGVfdGFzayhjb3JvdXRpbmUpCiAgICAuLi4gICMgc2NoZWR1bGUgb3RoZXIgdGFza3MgcGVyaGFwcz8KICAgIHJlc3VsdCA9IGF3YWl0IHRhc2sKICAgIHByaW50KHJlc3VsdCkKCmlmIF9fbmFtZV9fID09ICdfX21haW5fXyc6CiAgICBhc3luY2lvLnJ1bihtYWluKCkp