Перекрестная проверка с использованием Dagster - PullRequest
0 голосов
/ 01 апреля 2020

Я начал использовать Dagster в нашем ML-конвейере и столкнулся с некоторыми базовыми c проблемами, которые меня интересуют, если я что-то упустил здесь тривиально или это просто так ...

Скажем, у меня есть простой конвейер ML:

Load raw data --> Process data into table --> Split train / test --> train model --> evaluate model.

Линейная модель прямолинейна в Дагстере. Но что, если я хочу добавить немного l oop, скажем для целей перекрестной проверки:

Load raw data --> Process data into table --> Split into k folds, and for each fold:
  - fold 1: train model --> evaluate
  - fold 2: train model --> evaluate
  - fold 3: train model --> evaluate
  --> summarize cross validation results.

Есть ли хороший и чистый способ сделать это в Dagster? Я делаю так:

Load raw data --> Process data into table --> Split into K folds --> choose fold k --> train model --> evaluate model

со сгибом "k" в качестве входного параметра для конвейера. А затем запустить конвейер K раз.

Что мне здесь не хватает?

1 Ответ

1 голос
/ 08 мая 2020

Да, Dagster поддерживает разветвление твердых частиц на несколько твердых тел и затем разложение на приемник solid (ie для суммирования результатов) в пределах одного конвейера. Ниже приведен пример кода и соответствующая визуализация дага в da git ( full dag и увеличены в ).

@solid
def load_raw_data(_):
    yield Output('loaded_data')


@solid
def process_data_into_table(_, raw_data):
    yield Output(raw_data)


@solid(
    output_defs=[
        OutputDefinition(name='fold_one', dagster_type=int, is_required=True),
        OutputDefinition(name='fold_two', dagster_type=int, is_required=True),
    ],
)
def split_into_two_folds(_, table):
    yield Output(1, 'fold_one')
    yield Output(2, 'fold_two')


@solid
def train_fold(_, fold):
    yield Output('model')


@solid
def evaluate_fold(_, model):
    yield Output('compute_result')


@composite_solid
def process_fold(fold):
    return evaluate_fold(train_fold(fold))


@solid
def summarize_results(context, fold_1_result, fold_2_result):
    yield Output('summary_stats')


@pipeline
def ml_pipeline():
    fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data()))

    process_fold_one = process_fold.alias('process_fold_one')
    process_fold_two = process_fold.alias('process_fold_two')

    summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))

В примере кода мы используем псевдонимы, чтобы повторно использовать один и тот же лог c для каждого сгиба. Мы также консолидируем логи c для обработки каждого сгиба в составе solid.

Другим вариантом является программное создание PipelineDefinition напрямую, но я бы порекомендовал выше.

...