Да, Dagster поддерживает разветвление твердых частиц на несколько твердых тел и затем разложение на приемник solid (ie для суммирования результатов) в пределах одного конвейера. Ниже приведен пример кода и соответствующая визуализация дага в da git ( full dag и увеличены в ).
@solid
def load_raw_data(_):
yield Output('loaded_data')
@solid
def process_data_into_table(_, raw_data):
yield Output(raw_data)
@solid(
output_defs=[
OutputDefinition(name='fold_one', dagster_type=int, is_required=True),
OutputDefinition(name='fold_two', dagster_type=int, is_required=True),
],
)
def split_into_two_folds(_, table):
yield Output(1, 'fold_one')
yield Output(2, 'fold_two')
@solid
def train_fold(_, fold):
yield Output('model')
@solid
def evaluate_fold(_, model):
yield Output('compute_result')
@composite_solid
def process_fold(fold):
return evaluate_fold(train_fold(fold))
@solid
def summarize_results(context, fold_1_result, fold_2_result):
yield Output('summary_stats')
@pipeline
def ml_pipeline():
fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data()))
process_fold_one = process_fold.alias('process_fold_one')
process_fold_two = process_fold.alias('process_fold_two')
summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))
В примере кода мы используем псевдонимы, чтобы повторно использовать один и тот же лог c для каждого сгиба. Мы также консолидируем логи c для обработки каждого сгиба в составе solid.
Другим вариантом является программное создание PipelineDefinition напрямую, но я бы порекомендовал выше.