Apache Beam - ParquetIO + SparkRunner (выпуск для чтения) - PullRequest
1 голос
/ 07 мая 2020

В настоящее время я использую приложение Apache Beam, используя sparkRunner в нашем локальном Cloudera Had oop Cluster, я использую Apache Beam 2.16 и Apache Искра 2,4 . У меня есть 2 версии одного и того же конвейера, одна чтение из данных AVRO, а другая чтение из Parquet (код ниже)

PCollection<GenericRecord> records = pipeline
  .apply("Reading",ParquetIO.read(SCHEMA)
  .from("/foo/bar"));

records.apply("Writing",AvroIO.writeGenericRecords(SCHEMA)
  .to(options.getOutputPath())
  .withSuffix(".avro"));

Мы наблюдали неожиданное поведение между чтением выполнения avro и parquet. При чтении AVRO в DAG присутствуют следующие этапы (снимок экрана 1) enter image description here

мы читаем данные размером 2 ГБ, 15 файлов (сжатых с помощью Snappy), но запускаем один и тот же конвейер (но чтение паркета), мы увидели, что этапы разные, и есть дополнительный этап перераспределения, выполняемый в одной задаче (снимок экрана 2) enter image description here

Выполнение конвейер не работает в случае данных Parquet, потому что эта "рабочая перегрузка" (отдельная задача), я попытался добавить несколько дополнительных искровых флагов, но это все еще не удается. Пример одного выполнения с этими дополнительными флагами:

spark2-submit --master yarn --class my.class.MainClass \
--driver-memory 8G \
--executor-cores 5 \
--driver-cores 5 \
--conf spark.driver.memory=10G \
--conf spark.executor.memory=10G \
--conf spark.executor.memoryOverhead=2000 \
--conf spark.driver.memoryOverhead=2000 \
--conf spark.yarn.am.memoryOverhead=2000 \
--conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
--conf spark.network.timeout=1200 \
--conf spark.speculation=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.dynamicAllocation.minExecutors=10 \
--conf spark.shuffle.spill=true \
--conf spark.shuffle.spill.compress=true \
--conf spark.io.compression.codec=snappy \
--conf spark.executor.heartbeatInterval=10000000 \
--conf spark.network.timeout=10000000 \
--conf spark.default.parallelism=100 \
/parent/path/program.jar \
--inputPath="/this/is/the/input/path" \
--outputPath="/this/is/the/output/path" \
--runner=SparkRunner

Если кто-то может помочь мне с каким-то обходным путем, чтобы исправить эту «рабочую перегрузку» чтения данных паркета, будет очень полезно :)

...