Как избежать запуска остальной части дагстерского конвейера при определенных условиях - PullRequest
1 голос
/ 26 мая 2020

скажем, у меня есть два твердых тела в Dagster, соединенных по конвейеру. Первый solid может выполнять какой-то процесс и генерировать допустимый ввод, чтобы остальная часть конвейера выполнялась, или генерировать недопустимый ввод, который не должен обрабатываться в дальнейшем. Для достижения этого результата я генерирую ошибку, когда данные соответствуют недопустимому условию, поэтому конвейер останавливается, а остальные твердые тела пропускаются.

Возникновение ошибки для решения моего варианта использования кажется хакерским, есть ли способ пропустить выполнение остальной части конвейера, не прибегая к исключениям?

from dagster import solid, pipeline

@solid
def solid_1(context, x: int):
    y = x + 1

    if y%2 == 0:
        raise "No even number is further processed"

    return y

@solid
def solid_2(context, y:int):
    return y**2

@pipeline
def toy_pipeline():
    solid_2(solid_1())

В этом довольно надуманный пример, solid 2 должен выполняться только тогда, когда вывод из первого solid нечетный.

В моем фактическом варианте использования первый solid опрашивает БД и иногда не находит данных для обработать. В этом случае имеет смысл отмечать выполнение не как неудачное, а как успешное. Можно было бы проверить в каждом нисходящем solid, соответствуют ли данные условиям, но это быстро добавляет шаблон. Было бы идеально иметь способ пропустить выполнение всех последующих твердых тел, когда solid, который получает данные, не находит данных для обработки.

1 Ответ

2 голосов
/ 26 мая 2020

Для достижения желаемого поведения вывод можно пометить как необязательный с помощью аргумента is_required=False в соответствующем OutputDefinition. Это означает, что выходные данные не обязательно должны выдаваться solid.

Если дополнительный выход не предоставляется, все последующие твердые тела, зависящие от выходных данных, будут просто пропущены. Это полезно как для короткого замыкания конвейера, что является вашим вариантом использования, так и для более сложного ветвления logi c. При пропуске твердых тел участки трубопровода не помечаются как неисправные.

Вы использовали подсказки типов для определения типов ввода и вывода, но, поскольку вам нужно указать аргумент is_required, вам нужно использовать явный OuputDefinition.

from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
from typing import List

def query_db():
    return []

@solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
def solid_1(context):
    rows = query_db()

    if len(rows) > 0:
        yield Output(rows, output_name="data")


@solid
def solid_2(context, data: List[int]):
    context.log.info(str(data))
    pass


@pipeline
def my_pipeline():
    solid_2(solid_1())

solid solid_2 также можно определить с помощью InputDefinition вместо подсказок типа. Подсказки типа являются синтаксическими c сахаром для InputDefinitions:

@solid(input_defs=[InputDefinition('data', List[int])])
def solid_2(context, data):
    context.log.info(str(data))
    # Process data
    pass

В качестве побочного примечания: в общем случае исключения - это правильный способ пометить solid как неудачный и не считаются взломанными. в коде Dagster.

...