Трубопровод не может найти узлы в Кедро - PullRequest
2 голосов
/ 22 февраля 2020

Я следовал учебнику конвейеров , создал все необходимые файлы, запустил kedro с kedro run --node=preprocessing_data, но застрял с таким сообщением об ошибке:

ValueError: Pipeline does not contain nodes named ['preprocessing_data'].

Если я запускаю kedro без node параметр, я получаю

kedro.context.context.KedroContextError: Pipeline contains no nodes

Содержание файлов:

src/project/pipelines/data_engineering/nodes.py
def preprocess_data(data: SparkDataSet) -> None:
    print(data)
    return
src/project/pipelines/data_engineering/pipeline.py
def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess_data,
                inputs="data",
                outputs="preprocessed_data",
                name="preprocessing_data",
            ),
        ]
    )
src/project/pipeline.py
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
    de_pipeline = de.create_pipeline()
    return {
        "de": de_pipeline,
        "__default__": Pipeline([])
    }

Ответы [ 2 ]

3 голосов
/ 25 февраля 2020

Mayur c правильно, нет узлов, потому что ваш __default__ конвейер пуст. Другой вариант - запустить только конвейер de с cli.

kedro run --pipeline de

Эту опцию и другие параметры можно найти в тексте справки команды запуска.

$ kedro run --help

Usage: kedro run [OPTIONS]

  Run the pipeline.

Options:
  --from-inputs TEXT        A list of dataset names which should be used as a
                            starting point.
  --from-nodes TEXT         A list of node names which should be used as a
                            starting point.
  --to-nodes TEXT           A list of node names which should be used as an
                            end point.
  -n, --node TEXT           Run only nodes with specified names.
  -r, --runner TEXT         Specify a runner that you want to run the pipeline
                            with.
                            This option cannot be used together with
                            --parallel.
  -p, --parallel            Run the pipeline using the `ParallelRunner`.
                            If
                            not specified, use the `SequentialRunner`. This
                            flag cannot be used together
                            with --runner.
  -e, --env TEXT            Run the pipeline in a configured environment. If
                            not specified,
                            pipeline will run using environment
                            `local`.
  -t, --tag TEXT            Construct the pipeline using only nodes which have
                            this tag
                            attached. Option can be used multiple
                            times, what results in a
                            pipeline constructed from
                            nodes having any of those tags.
  -lv, --load-version TEXT  Specify a particular dataset version (timestamp)
                            for loading.
  --pipeline TEXT           Name of the modular pipeline to run.
                            If not set,
                            the project pipeline is run by default.
  -c, --config FILE         Specify a YAML configuration file to load the run
                            command arguments from. If command line arguments
                            are provided, they will
                            override the loaded ones.
  --params TEXT             Specify extra parameters that you want to pass
                            to
                            the context initializer. Items must be separated
                            by comma, keys - by colon,
                            example:
                            param1:value1,param2:value2. Each parameter is
                            split by the first comma,
                            so parameter values are
                            allowed to contain colons, parameter keys are not.
  -h, --help                Show this message and exit.

Добавлен второй ответ, поскольку полный вывод справки не помещается в комментарии.

3 голосов
/ 23 февраля 2020

Я думаю, что вам нужно иметь конвейер в __default__. например,

def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
    de_pipeline = de.create_pipeline()
    return {
        "de": data_engineering_pipeline,
        "__default__": data_engineering_pipeline
    }

Тогда kedro run --node=preprocessing_data у меня работает.

...