Apache Бегунок искры / мигания луча не запускается в EMR (файлы доступа из GCS) - PullRequest
0 голосов
/ 05 августа 2020

У меня есть конвейер apache лучей для индексации некоторых данных в elasticsearch. Я пытался использовать Spark или Flink runner для запуска задания в AWS EMR. Когда я пытался запустить задание в автономном режиме при локальной настройке, конвейер работал с исходными файлами на локальном диске, однако, когда я читал файл из GCS, он не работал. То же самое, когда я работаю в кластере EMR.

Конфиги, которые я установил на Had oop core-site. xml как конфигурация EMR

{
    "Classification": "core-site",
    "Properties": {
      "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
      "fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "fs.gs.project.id": "data-warehouse",
      "google.cloud.auth.service.account.enable": "true",
      "fs.gs.auth.service.account.json.keyfile": "/home/hadoop/utils/key.json"
    }
  }

Также , GCS-коннектор jar находится в пути Spark jar и имеет oop jar path

POM-файл maven для конвейера

<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.company.beam</groupId>
  <artifactId>IndexToEs</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <beam.version>2.22.0</beam.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.3</version>
        <executions>
          <execution>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <transformers>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.company.beam.IndexToEs</mainClass>
                </transformer>
              </transformers>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>

      </plugins>

    </pluginManagement>
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>
<!--     https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-extensions-google-cloud-platform-core -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
      <version>${beam.version}</version>
    </dependency>
<!--     https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
<!--      <scope>runtime</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-spark</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-flink_2.11</artifactId>
      <version>2.16.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.cloud.bigdataoss</groupId>
      <artifactId>gcs-connector</artifactId>
      <version>hadoop2-1.9.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.3</version>
    </dependency>




    <!-- slf4j API frontend binding with JUL backend -->
<!--    <dependency>-->
<!--      <groupId>org.slf4j</groupId>-->
<!--      <artifactId>slf4j-api</artifactId>-->
<!--      <version>${slf4j.version}</version>-->
<!--    </dependency>-->
<!--    <dependency>-->
<!--      <groupId>org.slf4j</groupId>-->
<!--      <artifactId>slf4j-jdk14</artifactId>-->
<!--      <version>${slf4j.version}</version>-->
<!--    </dependency>-->
  </dependencies>
</project>

Ошибка отсутствует, но EMR показывает задачу завершено, но конвейер не запущен.

Я не мог понять, это проблема apache луча или проблема конфигурации кластера.

1 Ответ

0 голосов
/ 10 августа 2020

Я разобрался в проблеме. Apache beam sdk использует gsutil для доступа к файлам GCS. Согласно документации flink, коннекторы has oop отвечали за доступ к любой другой файловой системе, но в случае луча apache, использующего flink runner, данные считываются с помощью gsutil и передаются в нисходящий поток. Итак, я установил Google could SDK и активировал сервисный аккаунт.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...