Проблема с Google Dataflow - PullRequest
       45

Проблема с Google Dataflow

2 голосов
/ 09 июля 2020

Мы недавно внедряем DataWareHouse в Google bigquery, и все наши источники находятся в примитивных базах данных. Таким образом, мы используем поток данных для ETL и Maven с Apache Beam SDK, чтобы запустить 30 конвейеров в службе Google Cloud Dataflow.

package com.google.cloud.teleport.templates; 
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;

public class MToBQ {

  private static ValueProvider<String> maybeDecrypt(
      ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
    return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
  }
  public static void main(String[] args) {
    JdbcConverters.JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(JdbcConverters.JdbcToBigQueryOptions.class);

    run(options);
  }
  private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
    Pipeline pipeline = Pipeline.create(options);
    pipeline
       
        .apply(
            "source",
            DynamicJdbcIOMiles.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery("select * from abcc")
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))
        .apply(
            "Target",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to("dev-27:staging.STG_ABC"));
 pipeline
        .apply(
            "SOURCE",
            DynamicJdbcIOMiles.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery("SELECT * FROM XYZ")
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))
 
        .apply(
            "TARGET",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to("dev-27:staging.STG_XYZ")); 
    return pipeline.run();
  }
}

если данных в таблицах меньше, то это работает успешно. Если данные в миллионах, это вызывает ошибку, как показано ниже

org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out

Чтобы скомпилировать и запустить основной метод класса Java с аргументами, я выполняю следующую команду.

mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.MToBQ \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project= dev-27 \
--region=australia-southeast1 \
--workerMachineType=n1-highmem-8 \
--workerDiskType=compute.googleapis.com/projects/dev-27/zones/australia-southeast1-c/diskTypes/pd-ssd \
--diskSizeGb=50 \
--stagingLocation=gs://dev-dataset/Data/stagingCustomDataFlow/MToBQ \
--tempLocation=gs://dev-dataset/Data/temp \
--templateLocation=gs://dev-dataset/Data/templatesCustomDataFlow/MToBQ/MToBQ.json \
--experiments=upload_graph \
--runner=DataflowRunner" **

Пожалуйста, дайте мне знать, прав ли я. каковы правильные аргументы и может ли поток данных выполнять несколько конвейеров параллельно?

...