Apache Beam для Google Cloud DataFlow - 404 ошибки при использовании BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED - PullRequest
0 голосов
/ 01 мая 2019

У меня возникла проблема при запуске Beam DataFlow (написанного на Java) для ввода данных в Google BigQuery. Эта проблема возникает в моей среде разработки, а также в производственной среде.

Я обрабатываю несколько элементов данных через конвейер Beam в отдельных потоках. Я обрабатываю, используя ParDo, который преобразует полученные данные, используя apply вместо PCollection. Затем для преобразованных данных я пытаюсь записать их в Google BigQuery.

Я пытаюсь записать свои преобразованные данные в BigQuery с помощью процедуры, которая выглядит следующим образом:

transformedData
   .apply("Load fact data",
         BigQueryIO.<ValidatedDataRecord>write()
         .to(new LoadDataFact.DynamicFactTableDestination(dataType.label))
         .withFormatFunction(new LoadDataFact.FactSerializationFn())
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Важно отметить, что я использую следующее:

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED

... потому что я хочу создать новую таблицу, если она еще не существует

Проблема в том, что когда новые данные становятся частью потока данных, и эти данные требуют нового набора данных для данных, я часто (но не всегда) получаю следующую ошибку 404:

Exception thrown in class : com.myOrg.myPackage.myClass Error : java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table my-project:my-dataset.my-table",
"reason" : "notFound"
} ],
"message" : "Not found: Table my-project:my-dataset.my-table",
"status" : "NOT_FOUND"
}

Я исследовал, что происходит в базовой библиотеке Java Core Beam SDK. Происходит следующее: при обработке 1-го нового элемента данных библиотека CreateTables.class в библиотеке Java Core ядра Beam SDK должна попытаться создать новую таблицу, и в случае успеха она добавляет запись в статическую коллекцию CreateTables.createdTables для новая таблица, чтобы указать, что таблица создана. Кажется, что ошибка 404 возникает после создания таблицы (хотя иногда таблица не может быть полностью создана). Я не уверен, что вызывает такое поведение, или что вызывает ошибку 404 (сообщение об ошибке не дает много информации). Этот тип ошибки в BigQuery обычно возникает из-за того, что таблица не существует ни на каком этапе доступа к ней.

CreateTables.class является частью следующей библиотеки Java Core ядра Beam SDK:

C:\Users\my.username\.m2\repository\org\apache\beam\beam-sdks-java-io-google-cloud-platform\2.5.0\beam-sdks-java-io-google-cloud-platform-2.5.0.jar!\org\apache\beam\sdk\io\gcp\bigquery\CreateTables.class

Я поднял эту ошибку на Beam Jira - https://issues.apache.org/jira/browse/BEAM-7195

Я попытался обновить библиотеку Java Core Beam SDK до версии 2.12.0, но по какой-то причине поток данных вообще перестал работать.

Есть ли способ, которым я могу обойти эту проблему? Я использую пользовательский класс для переопределения класса DynamicDestinations, который Beam использует для обработки (именно так в документации указано, что динамические адресаты данных должны быть реализованы - см. https://beam.apache.org/documentation/io/built-in/google-bigquery/#using-dynamic-destinations).

Мой пользовательский класс выглядит следующим образом, и в процессе каждого элемента данных вызывается метод getTable, чтобы определить таблицу, к которой следует добавить данные:

public class LoadDataFact {

    public static class DynamicFactTableDestination extends DynamicDestinations<ValidatedDataRecord, String> {
        private static final long serialVersionUID = -1234561111111123456L;
        private static final String projectID = "my-project";
        private String dataType = "none";
        private String elementDuration = "unknown";

        public DynamicFactTableDestination(String dataType) {
            this.dataType = dataType;
        }

        @Override
        public String getDestination(ValueInSingleWindow<ValidatedDataRecord> element) {    

            return element.getValue().DatasetName;
        }

        @Override
        public TableDestination getTable(String destination) {

            try {
                return new TableDestination(new TableReference()
                        .setProjectId(projectID)
                        .setDatasetId(destination)
                        .setTableId(String.format("data_for_%s",this.dataType)), "Data staging table",
                        new TimePartitioning()
                                .setType("DAY")
                                .setField("created_date_time"));
            }
            catch (Exception ex) {
                System.out.println("Error " + ex.getMessage());
            }

            return null;
        }

Вот мой pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.myOrg.myNamespace</groupId>
  <artifactId>my-artifact-name</artifactId>
  <version>1.0-SNAPSHOT</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.5.0</beam.version>
    <maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
        <plugin>
           <groupId>org.eclipse.m2e</groupId>
           <artifactId>lifecycle-mapping</artifactId>
           <version>1.0.0</version>
           <configuration>
              <lifecycleMappingMetadata>
                 <pluginExecutions>
                    <pluginExecution>
                       <pluginExecutionFilter>
                          <groupId>
                             org.apache.maven.plugins
                          </groupId>
                          <artifactId>
                             maven-compiler-plugin
                          </artifactId>
                          <versionRange>
                             [@maven-compiler-plugin.version@,)
                          </versionRange>
                          <goals>
                             <goal>compile</goal>
                             <goal>testCompile</goal>
                          </goals>
                       </pluginExecutionFilter>
                       <action>
                          <ignore></ignore>
                       </action>
                    </pluginExecution>
                 </pluginExecutions>
              </lifecycleMappingMetadata>
           </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>


     <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.5.0</version>
     </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
    </dependency>


    <!-- slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.msgpack</groupId>
      <artifactId>msgpack-core</artifactId>
      <version>0.8.16</version>
    </dependency>
  </dependencies>
</project>
...