Как сделать постоянное соединение с базой данных в FastAPI? - PullRequest
2 голосов
/ 05 августа 2020

Я пишу свой первый проект на FastAPI и немного борюсь. В частности, я не уверен, как я должен использовать пул соединений asyncpg в своем приложении. В настоящее время то, что у меня есть, выглядит так:

в db.py у меня есть

pgpool = None


async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
    return pgpool

, а затем в отдельных файлах я использую get_pool как зависимость.

@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
    # ... do things ...

Во-первых, каждая конечная точка, которую я имею, использует базу данных, поэтому кажется глупым добавлять этот аргумент зависимости для каждой отдельной функции. Во-вторых, это кажется окольным путём. Я определяю глобальный объект, затем я определяю функцию, которая возвращает этот глобальный объект, а затем я ввожу функцию. Я уверен, что есть более естественный способ сделать это.

Я видел, как люди предлагали просто добавить все, что мне нужно, в качестве свойства объекта приложения

@app.on_event("startup")
async def startup():
    app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

, но это не так. работать, когда у меня есть несколько файлов с маршрутизаторами, я не знаю, как получить доступ к объекту приложения из объекта маршрутизатора.

Что мне не хватает?

1 Ответ

1 голос
/ 06 августа 2020

Вы можете использовать шаблон фабрики приложений для настройки вашего приложения.

Чтобы избежать использования глобальных или добавления вещей непосредственно в объект приложения, вы можете создать свою собственную базу данных классов для хранения вашего пула соединений.

Чтобы передать пул соединений на каждый маршрут, вы можете использовать промежуточное ПО и добавить пул в request.state

Вот пример кода:

import asyncio

import asyncpg
from fastapi import FastAPI, Request

class Database():

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

def create_app():

    app = FastAPI()
    db = Database()

    @app.middleware("http")
    async def db_session_middleware(request: Request, call_next):
        request.state.pgpool = db.pool
        response = await call_next(request)
        return response

    @app.on_event("startup")
    async def startup():
        await db.create_pool()

    @app.on_event("shutdown")
    async def shutdown():
        # cleanup
        pass

    @app.get("/")
    async def hello(request: Request):
        print(request.state.pool)

    return app

app = create_app()
...