На самом деле кажется, что на это ответили в примере output_materialization .
Вы в основном определяете тип:
@usable_as_dagster_type(
name='LessSimpleDataFrame',
description='A more sophisticated data frame that type checks its structure.',
input_hydration_config=less_simple_data_frame_input_hydration_config,
output_materialization_config=less_simple_data_frame_output_materialization_config,
)
class LessSimpleDataFrame(list):
pass
Этот тип имеет стратегию output_materialization, которая читает config:
def less_simple_data_frame_output_materialization_config(
context, config, value
):
csv_path = os.path.abspath(config['csv']['path'])
# Save data to this path
И вы указываете этот путь в конфигурации:
execute_pipeline(
output_materialization_pipeline,
{
'solids': {
'sort_by_calories': {
'outputs': [
{'result': {'csv': {'path': 'cereal_out.csv'}}}
],
}
}
},
)
Вам все еще нужно придумать имя файла для каждого промежуточного вывода, но вы можете сделать это в config, который может отличаться для каждого прогона, вместо определения его в самом конвейере.