В настоящее время я использую приложение Apache Beam, используя sparkRunner в нашем локальном Cloudera Had oop Cluster, я использую Apache Beam 2.16 и Apache Искра 2,4 . У меня есть 2 версии одного и того же конвейера, одна чтение из данных AVRO, а другая чтение из Parquet (код ниже)
PCollection<GenericRecord> records = pipeline
.apply("Reading",ParquetIO.read(SCHEMA)
.from("/foo/bar"));
records.apply("Writing",AvroIO.writeGenericRecords(SCHEMA)
.to(options.getOutputPath())
.withSuffix(".avro"));
Мы наблюдали неожиданное поведение между чтением выполнения avro и parquet. При чтении AVRO в DAG присутствуют следующие этапы (снимок экрана 1)
мы читаем данные размером 2 ГБ, 15 файлов (сжатых с помощью Snappy), но запускаем один и тот же конвейер (но чтение паркета), мы увидели, что этапы разные, и есть дополнительный этап перераспределения, выполняемый в одной задаче (снимок экрана 2)
Выполнение конвейер не работает в случае данных Parquet, потому что эта "рабочая перегрузка" (отдельная задача), я попытался добавить несколько дополнительных искровых флагов, но это все еще не удается. Пример одного выполнения с этими дополнительными флагами:
spark2-submit --master yarn --class my.class.MainClass \
--driver-memory 8G \
--executor-cores 5 \
--driver-cores 5 \
--conf spark.driver.memory=10G \
--conf spark.executor.memory=10G \
--conf spark.executor.memoryOverhead=2000 \
--conf spark.driver.memoryOverhead=2000 \
--conf spark.yarn.am.memoryOverhead=2000 \
--conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
--conf spark.network.timeout=1200 \
--conf spark.speculation=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.dynamicAllocation.minExecutors=10 \
--conf spark.shuffle.spill=true \
--conf spark.shuffle.spill.compress=true \
--conf spark.io.compression.codec=snappy \
--conf spark.executor.heartbeatInterval=10000000 \
--conf spark.network.timeout=10000000 \
--conf spark.default.parallelism=100 \
/parent/path/program.jar \
--inputPath="/this/is/the/input/path" \
--outputPath="/this/is/the/output/path" \
--runner=SparkRunner
Если кто-то может помочь мне с каким-то обходным путем, чтобы исправить эту «рабочую перегрузку» чтения данных паркета, будет очень полезно :)