Невозможно установить регион для набора данных BigQuery при использовании прямого бегуна с использованием Apache Beam - PullRequest
0 голосов
/ 04 мая 2018

Я не могу установить регион для набора данных BigQuery при использовании Direct Runner с использованием Apache Beam.

Я пытаюсь получить данные из Oracle через JdbcIO.read, используя Apache Beam для получения данных и push в таблицу BigQuery.

Проблема в том, что мне нужно использовать Direct Runner, поскольку база данных Oracle является локальной, а набор данных BigQuery должен находиться в Азии, а не в стандартном us-central по умолчанию. Так что теперь я не могу push данные, потому что местоположение по умолчанию не имеет моего целевого набора данных.

Я пытался расширить GcpOption, но это не помогло. Не предлагайте мне использовать DataflowRunner, потому что он не может загружать данные из Oracle локально. Благодаря.

      pipeline
      .apply("Read JDBC", JdbcIO.<TableRow>read()
          .withDataSourceConfiguration(DataSourceConfiguration.create(
              DRIVER_CLASS_NAME,
              HikariDataSourceModule.getJdbcUrl())
              .withUsername(HikariDataSourceModule.getUserName())
              .withPassword(HikariDataSourceModule.getPassword()))
          .withQuery(sql)
          .withRowMapper((RowMapper<TableRow>) ResultSetConvertUtil::convertToTableRow)
          .withCoder(TableRowJsonCoder.of())
      )
      .setCoder(TableRowJsonCoder.of())
      .apply("Write BigQuery", BigQueryIO.writeTableRows()
          .withoutValidation()
          .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
          .withSchema(tableSchema)
          .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
          .to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) value -> {
            String dayString = (DateTimeFormatter.ofPattern("yyyyMMdd"))
                .withZone(ZoneId.of("UTC")).format(Instant.now());
            return new TableDestination(
                bigqueryDestTable.value() + "_" + dayString, // Table name
                "Output for day " + dayString // Table description
            );
          }));

Вот журнал, который я получаю при попытке push данных в набор данных из asia-northeast1:

May 08, 2018 9:41:55 AM vn.com.momo.analytics.JdbcToBigQuery main
INFO: creating option
May 08, 2018 9:41:58 AM vn.com.momo.analytics.JdbcToBigQuery main
INFO: creating pipeline
May 08, 2018 9:41:58 AM vn.com.momo.analytics.JdbcToBigQuery main
INFO: getting connection
May 08, 2018 9:41:58 AM vn.com.momo.analytics.JdbcToBigQuery main
INFO: SQL :SELECT * FROM mis_admin.MIS_ACCOUNT WHERE ROWNUM <=10 
May 08, 2018 9:42:00 AM vn.com.momo.analytics.JdbcToBigQuery main
INFO: running pipeline
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create load job with id prefix 2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_IF_NEEDED",
      "destinationTable" : {
        "datasetId" : "thaodo",
        "projectId" : "momovn-test",
        "tableId" : "MIS_ACCOUNT_20180508"
      },
      "schema" : {
        "fields" : [ {
          "name" : "ID",
          "type" : "FLOAT"
        }, {
          "name" : "ACC_PHONE",
          "type" : "STRING"
        }, {
          "name" : "PRIMARY",
          "type" : "STRING"
        }, {
          "name" : "PROFILE_ID",
          "type" : "FLOAT"
        }, {
          "name" : "START_DATE",
          "type" : "TIMESTAMP"
        }, {
          "name" : "END_DATE",
          "type" : "TIMESTAMP"
        } ]
      },
      "sourceFormat" : "NEWLINE_DELIMITED_JSON",
      "sourceUris" : [ "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/1e21b450-92cb-4a5b-962b-883333a60701", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/22cd1016-8d8f-43e9-941e-6c4346351bef", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/4f78e230-007e-4e9e-85bf-7f2547951385", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/85db96b6-6410-4711-8ac1-274c3314e28f", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/5990d321-e710-42df-9573-dc1360f5b154", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/677062b1-91a0-433b-937e-ca68b8c13692", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/9c1520bc-a5dd-411e-aa04-b6e138fb8897", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/9573f1d0-1742-4e04-8a2c-cbf9cd9f990e", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/40a5b885-be18-4de5-8b1a-18cd4a7e2705", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/99bf2a23-b82d-444a-b286-2e9667b4e180" ],
      "writeDisposition" : "WRITE_TRUNCATE"
    }
  },
  "etag" : "\"8-EGZnPk12W9PiVn0ZRpZVdMBYs/RYObXstf1bVDNaEj4k7hDizD8UQ\"",
  "id" : "momovn-test:2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000-2",
  "jobReference" : {
    "jobId" : "2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000-2",
    "projectId" : "momovn-test"
  },
  "kind" : "bigquery#job",
  "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/momovn-test/jobs/2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000-2",
  "statistics" : {
    "creationTime" : "1525747335309",
    "endTime" : "1525747335436",
    "startTime" : "1525747335436"
  },
  "status" : {
    "errorResult" : {
      "message" : "Not found: Dataset momovn-test:thaodo. Please verify that the dataset exists and the correct location was used for the job.",
      "reason" : "notFound"
    },
    "errors" : [ {
      "message" : "Not found: Dataset momovn-test:thaodo. Please verify that the dataset exists and the correct location was used for the job.",
      "reason" : "notFound"
    } ],
    "state" : "DONE"
  },
  "user_email" : "etl-bigquery@momovn-test.iam.gserviceaccount.com"
}.
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:342)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:312)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:206)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at vn.com.momo.analytics.JdbcToBigQuery.main(JdbcToBigQuery.java:195)
Caused by: java.lang.RuntimeException: Failed to create load job with id prefix 2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000, reached max retries: 3, last failed load job: {
  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_IF_NEEDED",
      "destinationTable" : {
        "datasetId" : "thaodo",
        "projectId" : "momovn-test",
        "tableId" : "MIS_ACCOUNT_20180508"
      },
      "schema" : {
        "fields" : [ {
          "name" : "ID",
          "type" : "FLOAT"
        }, {
          "name" : "ACC_PHONE",
          "type" : "STRING"
        }, {
          "name" : "PRIMARY",
          "type" : "STRING"
        }, {
          "name" : "PROFILE_ID",
          "type" : "FLOAT"
        }, {
          "name" : "START_DATE",
          "type" : "TIMESTAMP"
        }, {
          "name" : "END_DATE",
          "type" : "TIMESTAMP"
        } ]
      },
      "sourceFormat" : "NEWLINE_DELIMITED_JSON",
      "sourceUris" : [ "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/1e21b450-92cb-4a5b-962b-883333a60701", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/22cd1016-8d8f-43e9-941e-6c4346351bef", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/4f78e230-007e-4e9e-85bf-7f2547951385", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/85db96b6-6410-4711-8ac1-274c3314e28f", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/5990d321-e710-42df-9573-dc1360f5b154", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/677062b1-91a0-433b-937e-ca68b8c13692", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/9c1520bc-a5dd-411e-aa04-b6e138fb8897", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/9573f1d0-1742-4e04-8a2c-cbf9cd9f990e", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/40a5b885-be18-4de5-8b1a-18cd4a7e2705", "gs://test-hieu/tmp/BigQueryWriteTemp/2fec46a118634a7f8e67d6f8753d1512/99bf2a23-b82d-444a-b286-2e9667b4e180" ],
      "writeDisposition" : "WRITE_TRUNCATE"
    }
  },
  "etag" : "\"8-EGZnPk12W9PiVn0ZRpZVdMBYs/RYObXstf1bVDNaEj4k7hDizD8UQ\"",
  "id" : "momovn-test:2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000-2",
  "jobReference" : {
    "jobId" : "2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000-2",
    "projectId" : "momovn-test"
  },
  "kind" : "bigquery#job",
  "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/momovn-test/jobs/2fec46a118634a7f8e67d6f8753d1512_28f0784b6adf4a72e2a27dc45e17b5a9_00001_00000-2",
  "statistics" : {
    "creationTime" : "1525747335309",
    "endTime" : "1525747335436",
    "startTime" : "1525747335436"
  },
  "status" : {
    "errorResult" : {
      "message" : "Not found: Dataset momovn-test:thaodo. Please verify that the dataset exists and the correct location was used for the job.",
      "reason" : "notFound"
    },
    "errors" : [ {
      "message" : "Not found: Dataset momovn-test:thaodo. Please verify that the dataset exists and the correct location was used for the job.",
      "reason" : "notFound"
    } ],
    "state" : "DONE"
  },
  "user_email" : "etl-bigquery@momovn-test.iam.gserviceaccount.com"
}.
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:289)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.access$600(WriteTables.java:79)
    at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$WriteTablesDoFn.processElement(WriteTables.java:158)
ERROR: Non-zero return code '1' from command: Process exited with status 1.

1 Ответ

0 голосов
/ 13 июня 2018

Обратите внимание, что команда .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED), описанная здесь , создает таблицу, но не набор данных.

Набор данных должен существовать до запуска кода. Вы можете выбрать местоположение при его создании.

...