Создание файлов в дагстере без заботы о имени файла - PullRequest
0 голосов
/ 20 февраля 2020

В уроке по дагстеру в разделе Материализации мы выбираем имя файла (sorted_cereals_csv_path) для нашего промежуточного вывода, а затем выдаем его в качестве материализации:

@solid
def sort_by_calories(context, cereals):

    # Sort the data (removed for brevity)

    sorted_cereals_csv_path = os.path.abspath(
        'calories_sorted_{run_id}.csv'.format(run_id=context.run_id)
    )
    with open(sorted_cereals_csv_path, 'w') as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)
    yield Materialization(
        label='sorted_cereals_csv',
        description='Cereals data frame sorted by caloric content',
        metadata_entries=[
            EventMetadataEntry.path(
                sorted_cereals_csv_path, 'sorted_cereals_csv_path'
            )
        ],
    )
    yield Output(None)

Тем не менее, это зависит от того факта, что мы можем использовать локальную файловую систему (что может быть неверно), она, вероятно, будет перезаписана при последующих запусках (а это не то, что я хочу), и это также заставляет нас придумать имя файла который никогда не будет использован.

В большинстве моих тел я хотел бы просто сказать: «Это файловый объект, пожалуйста, сохраните его для меня», не обращая внимания на где это будет сохранено. Могу ли я материализовать файл, не учитывая все эти вещи? Должен ли я использовать python tempfile средство для этого?

1 Ответ

0 голосов
/ 20 февраля 2020

На самом деле кажется, что на это ответили в примере output_materialization .

Вы в основном определяете тип:

@usable_as_dagster_type(
    name='LessSimpleDataFrame',
    description='A more sophisticated data frame that type checks its structure.',
    input_hydration_config=less_simple_data_frame_input_hydration_config,
    output_materialization_config=less_simple_data_frame_output_materialization_config,
)
class LessSimpleDataFrame(list):
    pass

Этот тип имеет стратегию output_materialization, которая читает config:

def less_simple_data_frame_output_materialization_config(
    context, config, value
):
    csv_path = os.path.abspath(config['csv']['path'])

    # Save data to this path

И вы указываете этот путь в конфигурации:

    execute_pipeline(
        output_materialization_pipeline,
        {
            'solids': {
                'sort_by_calories': {
                    'outputs': [
                        {'result': {'csv': {'path': 'cereal_out.csv'}}}
                    ],
                }
            }
        },
    )

Вам все еще нужно придумать имя файла для каждого промежуточного вывода, но вы можете сделать это в config, который может отличаться для каждого прогона, вместо определения его в самом конвейере.

...