Облачные задачи условного исполнения - PullRequest
3 голосов
/ 15 апреля 2020

Я использую облачные задачи. Мне нужно запустить выполнение Задачи C только тогда, когда Задача A и Задача B были успешно выполнены. Поэтому мне нужен какой-то способ чтения / получения уведомлений о статусах запускаемых задач. Но я не вижу способа сделать это в документации GCP. Использование Node.js SDK для создания задач и облачных функций в качестве обработчиков задач, если это вообще помогает.

Редактирование:

По запросу, здесь больше информации о том, что мы делаем:

Задачи 1 - 10, каждая из которых выполняет HTTP-запросы, извлекает данные, обновляет отдельные коллекции в Firestore на основе этих данных. Эти 10 задач могут выполняться параллельно и в произвольном порядке, поскольку они не зависят друг от друга. Все эти задачи фактически реализованы внутри GCF.

Задача 11 фактически зависит от данных коллекции Firestore, обновленных Задачами 1 - 10. Таким образом, она может быть запущена только после успешного выполнения Задач 1 - 10.

Мы выпускаем RunID в качестве общего идентификатора для группировки определенного запуска всех задач (1 - 11).

Ответы [ 2 ]

3 голосов
/ 15 апреля 2020

Cloud Task только запускает задачу, вы можете только определить условие времени. Вы должны вручную кодировать проверку при запуске задачи C.

Вот пример процесса:

  • Задача A выполняется, в конце запись задачи в firestore завершена
  • Задача B выполняется, в в конце, задача записывается в пожарное хранилище, которое завершено
  • Задача C запускается и проверяет, завершены ли A и B в пожарном хранилище.
    • Если нет, то выход из задачи по ошибке
    • Да, продолжить процесс

Вам необходимо настроить задачу C очередь для повторного выполнения задачи в случае ошибки.

Другое, дорогое решение - использовать Cloud Composer для обработки этого рабочего процесса

На данный момент другого решения пока нет об управлении рабочим процессом.

1 голос
/ 16 апреля 2020

Cloud Tasks - не тот инструмент, который вы хотите использовать в этом случае. Взгляните на Облако Composer, которое встроено в Apache Воздушный поток для GCP.

Редактировать: Вы можете создать GCF для обработки состояний этих запросов

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

################ TASK A
taskA_list = [
    "https://via.placeholder.com/400",
    "https://via.placeholder.com/410",
    "https://via.placeholder.com/420",
    "https://via.placeholder.com/430",
    "https://via.placeholder.com/440",
    "https://via.placeholder.com/450",
    "https://via.placeholder.com/460",
    "https://via.placeholder.com/470",
    "https://via.placeholder.com/480",
    "https://via.placeholder.com/490",
]

def call2TaskA(url):
    html = requests.get(url, stream=True)
    return (url,html.status_code)


processes = []
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for url in taskA_list:
        processes.append(executor.submit(call2TaskA, url))

isOkayToDoTaskB = True
for taskA in as_completed(processes):
    result = taskA.result()
    if result[1] != 200: # your validation on taskA
        isOkayToDoTaskB = False
    results.append(result)

if not isOkayToDoTaskB:
    raise ValueError('Problems: {}'.format(results))

################ TASK B
def doTaskB():
    pass

doTaskB()
...