Как вы пишете в HDFS из луча? - PullRequest
0 голосов
/ 02 ноября 2018

Я пытаюсь написать конвейер Beam, который работает с использованием SparkRunner, читает из локального файла и пишет в HDFS.

Вот минимальный пример:

Класс опций -

package com.mycompany.beam.hdfsIOIssue;

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;

public interface WritingToHDFSOptions extends PipelineOptions, SparkPipelineOptions, HadoopFileSystemOptions {

  @Validation.Required
  @Description("Path of the local file to read from")
  String getInputFile();
  void setInputFile(String value);

  @Validation.Required
  @Description("Path of the HDFS to write to")
  String getOutputFile();
  void setOutputFile(String value);

}

Балка основного класса -

package com.mycompany.beam.hdfsIOIssue;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;

public class WritingToHDFS {

  public static void main(String[] args) {
    PipelineOptionsFactory.register(WritingToHDFSOptions.class);

    WritingToHDFSOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WritingToHDFSOptions.class);

    Pipeline p = Pipeline.create(options);

    buildPipeline(p, options);

    p.run();
  }

  static void buildPipeline(Pipeline p, WritingToHDFSOptions options) {
    PCollection<String> input = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));

    ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(options.getOutputFile());
    TextIO.Write write = TextIO.write().to(resource);
    input.apply("WriteLines", write);
  }
}

Запуск, как:

spark-submit test --master yarn --deploy-mode cluster --class com.mycompany.beam.hdfsIOIssue.WritingToHDFS my-project-bundled-0.1-SNAPSHOT.jar --runner=SparkRunner --inputFile=testInput --outputFile=hdfs://testOutput

То, что я ожидаю, произойдет: он читает строки в локальном файле testInput и записывает их в новый файл с именами testOutput в моем домашнем каталоге hdfs.

Что на самом деле происходит: ничего, насколько я могу судить. Spark говорит, что задание успешно завершено, и я вижу шаги Beam в журналах, но нет файла или каталога с именем testOutput, записанных в hdfs или в мой локальный каталог. Может быть, он записывается локально на узлах spark executor, но у меня нет доступа к ним для проверки.

Я предполагаю, что либо я неправильно использую интерфейс TextIO, либо мне нужно сделать больше для настройки файловой системы, а не просто добавить ее в мой интерфейс PipelineOptions. Но я не могу найти документацию, которая объясняет, как это сделать.

1 Ответ

0 голосов
/ 05 ноября 2018

Я думаю, ваши варианты должны выглядеть примерно так:

--inputFile=hdfs:///testInput --outputFile=hdfs:///testOutput

Возможно, вы также захотите дождаться окончания конвейера, чтобы увидеть результат:

p.run().waitUntilFinish();

Вы можете найти простой полный рабочий пример записи HDFS (файлы Avro) здесь

Обратите внимание на ( BEAM-2277 ), который также может применяться в зависимости от версии Beam, с которой вы работаете (это приведет к ошибке). Вы можете обойти это, используя:

TextIO.Write write = TextIO.write().to(resource)
  .withTempDirectory(FileSystems.matchNewResource("hdfs:///tmp/beam-test", true));

Если вы упакуете свой проект в общедоступном репозитории GitHub, я протестирую его и помогу запустить.

...