DataflowRunner требует gcpTempLocation, но не удалось получить значение из PipelineOptions - PullRequest
0 голосов
/ 08 июля 2019

Я создаю демонстрационный конвейер для загрузки CSV-файла в BigQuery с Dataflow, используя мою бесплатную учетную запись Google. Это то, с чем я сталкиваюсь.

Когда я читаю из файла GCS и просто регистрирую данные, это работает отлично. ниже приведен мой пример кода.

Этот код работает нормально

DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("project12345");
options.setStagingLocation("gs://mybucket/staging");
options.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://mybucket/charges.csv")).apply(ParDo.of(new DoFn<String, Void>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                LOG.info(c.element());
            }

}));

Однако, когда я добавляю папку временной папки с путем к созданной корзине, я получаю сообщение об ошибке, ниже мой код.


        LOG.debug("Starting Pipeline");
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("project12345");
        options.setStagingLocation("gs://mybucket/staging");
        options.setTempLocation("gs://project12345/temp");
        options.setJobName("csvtobq");
        options.setRunner(DataflowRunner.class);

        DataflowRunner.fromOptions(options);
        Pipeline p = Pipeline.create(options);

        boolean isStreaming = false;
        TableReference tableRef = new TableReference();
        tableRef.setProjectId("project12345");
        tableRef.setDatasetId("charges_data");
        tableRef.setTableId("charges_data_id");

        p.apply("Loading Data from GCS", TextIO.read().from("gs://mybucket/charges.csv"))
                .apply("Convert to BiqQuery Table Row", ParDo.of(new FormatForBigquery()))
                .apply("Write into Data in to Big Query",
                        BigQueryIO.writeTableRows().to(tableRef).withSchema(FormatForBigquery.getSchema())
                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                                .withWriteDisposition(isStreaming ? BigQueryIO.Write.WriteDisposition.WRITE_APPEND
                                        : BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

        p.run().waitUntilFinish();
    } 

Когда я запускаю это, я получаю следующую ошибку

Exception in thread "main" java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:242)
    at demobigquery.StarterPipeline.main(StarterPipeline.java:74)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://project12345/temp. 
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:247)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:228)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:155)
    at com.sun.proxy.$Proxy15.getGcpTempLocation(Unknown Source)
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:240)

Это проблема с аутентификацией? Потому что я использую учетные данные json в качестве владельца проекта из GCP через плагин Eclipse Dataflow.

Любая помощь будет принята с благодарностью.

Ответы [ 2 ]

0 голосов
/ 09 июля 2019

Это может быть связано с параметром потоковой передачи, который вы устанавливаете. CSV-загрузки автоматически устанавливаются как пакетные задания. Следовательно, если вы пытаетесь установить его как поток, это может вызвать проблемы.

Если вы настаиваете на потоковой передаче, ознакомьтесь с этой документацией .

0 голосов
/ 08 июля 2019

Похоже на сообщение об ошибке, выданное [1]. Валидатор GCS по умолчанию реализован в [2]. Как вы можете видеть, код луча также вызывает исключение для IllegalArgumentException. Таким образом, вы можете проверить стек дальше для исключения, произошедшего в GcsPathValidator.

[1] https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java#L278

[2] https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java#L29

...