Для достижения желаемого поведения вывод можно пометить как необязательный с помощью аргумента is_required=False
в соответствующем OutputDefinition
. Это означает, что выходные данные не обязательно должны выдаваться solid.
Если дополнительный выход не предоставляется, все последующие твердые тела, зависящие от выходных данных, будут просто пропущены. Это полезно как для короткого замыкания конвейера, что является вашим вариантом использования, так и для более сложного ветвления logi c. При пропуске твердых тел участки трубопровода не помечаются как неисправные.
Вы использовали подсказки типов для определения типов ввода и вывода, но, поскольку вам нужно указать аргумент is_required
, вам нужно использовать явный OuputDefinition
.
from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
from typing import List
def query_db():
return []
@solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
def solid_1(context):
rows = query_db()
if len(rows) > 0:
yield Output(rows, output_name="data")
@solid
def solid_2(context, data: List[int]):
context.log.info(str(data))
pass
@pipeline
def my_pipeline():
solid_2(solid_1())
solid solid_2
также можно определить с помощью InputDefinition
вместо подсказок типа. Подсказки типа являются синтаксическими c сахаром для InputDefinitions
:
@solid(input_defs=[InputDefinition('data', List[int])])
def solid_2(context, data):
context.log.info(str(data))
# Process data
pass
В качестве побочного примечания: в общем случае исключения - это правильный способ пометить solid как неудачный и не считаются взломанными. в коде Dagster.