Проблема комплектации Apache Beam - PullRequest
0 голосов
/ 20 марта 2019

Моя проблема в следующем, я хочу объединить некоторые данные, которые хранятся на S3.В качестве исходного ввода в мой конвейер я использую текстовый файл, который содержит путь ко всем файлам S3, которые должны быть агрегированы.

 PCollection<String> readInputPipeline = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
 readInputPipeline = readInputPipeline.apply(ParDo.of(new ReadFromS3Mapper()));

Входной файл имеет 346 тыс. Строк.Когда я внедряю этот код в кластер Spark, чтение из S3 выглядит так, как будто это происходит только в двух задачах Spark, хотя доступно много ядер.Можно ли как-нибудь увеличить параллельность этой операции?

enter image description here

Я запускаю это на EMR на Amazon с мастером (m3.xlarge) и базовый компьютер (R3.4xlarge) со следующими параметрами:

"spark-submit"
  "--driver-java-options='-Dspark.yarn.app.container.log.dir=/mnt/var/log/hadoop'",
  "--master", "yarn",
  "--executor-cores","16",
  "--executor-memory","6g"

PS: может быть, решение может заключаться в том, что я не должен выполнять такие дорогостоящие операции ввода-вывода в этом контексте?

1 Ответ

0 голосов
/ 20 марта 2019

Spark решает, как разделить входные данные, здесь решено просмотреть весь файл за один раз, потому что он такой маленький.

Я сделал нечто подобное в приложении distcp ;здесь используется класс Spark ParallelCollectionRDD , чтобы явно указать искре разделить список по очереди.

Этого класса должно быть достаточно для того, чтобы вы сделали что-то подобное - вам, возможно, придется прочитать исходный текстовый файл локально в список, а затем передать список конструктору ParallelCollectionRDD

...