Исключение неуправляемого потока данных - PullRequest
0 голосов
/ 20 ноября 2018

Мы создали очень простую работу в потоке данных.Он читает из BigQuery и периодически сохраняет в RedisCluster.Код конвейера выглядит следующим образом.

    // initialize pipeline
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline pipeline = Pipeline.create(options);

    // ... read from bigquery
    final String query = "..."; // actual query snipped
    PCollection<TableRow> bigQueryRes = pipeline.apply("Reading from BigQuery table",
            BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());

    // ... store to Redis Cluster
    bigQueryRes.apply("Storing to RedisCluster",
            ParDo.of(new RedisClusterSinkDoFn()));

    // run pipeline
    pipeline.run();

Когда мы напрямую выполняем задание напрямую через командную строку, оно выполняется идеально.

Но когда мы генерируем шаблон из того же JAR-файла, выполняемого через Cloud Function, используя launch api , мы находим следующее исключение:

 java.io.FileNotFoundException: No files matched spec: gs://dataflow-bucket/tmp/BigQueryExtractTemp/0e22ca710a6b45f7aa134cff02b4ae9b/000000000057.avro

    at org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult (FileSystems.java:166)
    at org.apache.beam.sdk.io.FileSystems.match (FileSystems.java:153)
    at org.apache.beam.sdk.io.FileBasedSource.createReader (FileBasedSource.java:332)
    at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$1.iterator (WorkerCustomSources.java:362)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:179)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start (ReadOperation.java:160)
    at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute (MapTaskExecutor.java:77)
    at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork (BatchDataflowWorker.java:395)
    at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork (BatchDataflowWorker.java:364)
    at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork (BatchDataflowWorker.java:292)

Мы не уверенычто вызывает эту проблему, поскольку ни журналы потока данных, ни BigQuery не помогают.Единственное отличие, которое я могу сказать, заключается в том, что когда задание выполняется успешно, существует папка "gs: // dataflow-bucket / tmp / BigQueryExtractTemp", которая создается с раздельным выводом результата запроса.Папка отсутствует, задание не выполняется.

Есть мысли о том, с чего начать отладку для этого?

Спасибо в ожидании.

1 Ответ

0 голосов
/ 19 декабря 2018

Я исследовал эту проблему и обнаружил, что jobId (для задания BigQuery), используемый в BigQueryIO, создается при создании шаблона.Это означает, что при повторном выполнении для того же шаблона будет сгенерирована ошибка 409, говорящая о том, что задание BigQuery с тем же jobId уже существует.В результате возникает ошибка «java.io.FileNotFoundException: файлы не соответствуют спецификации:», так как не создан временный файл для входных данных BigQuery.

К счастью, эта ошибка была исправлена ​​в версиях Apache Beam 2.x., Документация Google говорит:

"Если вы хотите запустить пакетный конвейер, который читает из BigQuery, вы должны использовать .withTemplateCompatibility() для всех операций чтения BigQuery"

Итак, чтобы исправитьэто добавляет .withTemplateCompatibility() к вашему прочтению BigQueryIO.

Надеюсь, это поможет вам!

...