fork download
  1. from collections.abc import AsyncGenerator
  2. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  3.  
  4. engine = create_async_engine(database_url)
  5. AsyncSessionLocal = async_sessionmaker(engine)
  6.  
  7. async def get_db() -> AsyncGenerator[AsyncSession]:
  8. async with AsyncSessionLocal() as async_session:
  9. yield async_session
  10.  
  11. ...
  12.  
  13. @router.get('/', response_model=list[SchemaName])
  14. async def get_list(session: Annotated[AsyncSession, Depends(get_db)]) -> list[ModelName]:
  15. result = await session.scalars(select(ModelName))
  16. return result.all()
Runtime error #stdin #stdout #stderr 0.34s 40308KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
Traceback (most recent call last):
  File "./prog.py", line 2, in <module>
ImportError: cannot import name 'async_sessionmaker' from 'sqlalchemy.ext.asyncio' (/usr/local/lib/python3.9/dist-packages/sqlalchemy/ext/asyncio/__init__.py)