Асинхронный метод не выполняется в фоновом режиме в приложении Flask - PullRequest
0 голосов
/ 02 мая 2018

У меня есть GraphQL API, созданный с помощью пакетов Flask и Graphene, работающих на Python 3.5.4 . Одна из мутаций GraphQL занимает некоторое время для выполнения (от 2 до 5 минут), и я не хочу, чтобы конечный пользователь ждал завершения его выполнения. Я хотел бы, чтобы метод мутации выполнялся в фоновом режиме и мгновенно возвращал сообщение пользователю.

Я изучил пакет asyncio , но по какой-то причине выполнение все еще остается на первом плане, и мой сценарий ждет. Не могли бы вы понять, что я делаю не так? Сценарий довольно длинный, поэтому ниже я привел краткое изложение ключевых элементов, связанных с asyncio.

Файл mutation_migration_plan.py

from migration_script import Migration, main
import asyncio
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
    [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(migration_plan))
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

Файл миграции_script.py

class Migration():
    """Class to execute migration of Plan, Step, Object."""

    @staticmethod
    async def migrate(migration_plan, migration_step=None, migration_object=None):
        [...]

async def main(migration_plan, migration_step=None, migration_object=None):
    asyncio.ensure_future(Migration.migrate(migration_plan, migration_step, migration_object))

В основном я ожидаю увидеть print('Migration plan execution has started. You will receive an e-mail when it is terminated.') почти мгновенно в моем окне консоли, в то время как метод loop.run_until_complete(main(migration_plan)), но сейчас это не тот случай, и печать появляется только в конце выполнения.

[ОБНОВЛЕНО]

Следуя ответу @Vincent ниже, я обновил первый файл Файл mutation_migration_plan.py для использования ThreadPoolExecutor и удалил все, что связано с асинхронностью, из обоих файлов.

Файл mutation_migration_plan.py

from migration_script import Migration
from concurrent.futures import ThreadPoolExecutor
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
        [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        executor = ThreadPoolExecutor(max_workers=1)
        future = executor.submit(Migration.migrate, migration_plan)
        # print(future.result())
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

Мой скрипт работает нормально, когда я добавляю строку print(future.result()), но он не выполняется в фоновом режиме (имеет смысл, так как я пытаюсь распечатать результаты). Однако, когда я закомментирую печать, мой метод Migration.migrate, кажется, не выполняется должным образом (я знаю это, потому что я не вижу результатов в моей базе данных). Есть идеи почему?

[ОБНОВЛЕНИЕ BIS]

Файл mutation_migration_plan.py

Мне удалось выполнить мой метод асинхронно, используя ProcessPoolExecutor и удалив все ссылки на asyncio в обоих файлах. Смотрите следующий код:

Файл mutation_migration_plan.py

from concurrent.futures import ProcessPoolExecutor
[...]

class executeMigrationPlan(graphene.Mutation):
    """Mutation to execute a migration plan."""
    [...]

    @staticmethod
    def mutate(root, info, input):
        [...]
        # Execute migration asynchronously
        print('Execute migration asynchronously')
        executor = ProcessPoolExecutor()
        executor.submit(Migration.migrate, migration_plan.id)
        print('Migration plan execution has started. You will receive an e-mail when it is terminated.')

        ok = True
        message = 'Migration plan execution has started. You will receive an e-mail when it is terminated.'
        return executeMigrationPlan(ok=ok, message=message)

Это работает, но хотя процесс выполняется в бэкэнде, моему приложению Falsk очень долго отправляется ответ http, и ответ иногда пуст.

1 Ответ

0 голосов
/ 02 мая 2018

Это распространенное заблуждение, но вы не можете подключить asyncio к существующему приложению и ожидать, что оно будет работать. В asyncio каждый блокирующий вызов должен использовать синтаксис await в контексте сопрограммы. Это единственный способ добиться однопоточного параллелизма. Это означает, что вам придется использовать aiohttp вместо колбы вместе с библиотекой вроде aiohttp-graphql .

Это потребует серьезной переписки вашего приложения. Если вы не хотите проходить через это, есть другие решения, которые хорошо интегрируются с колбой. Вы можете использовать сельдерей , как указано @dirn, или одного из исполнителей, предоставленных concurrent.futures .

...