Как оптимизировать Hadoop MapReduce, сжимая результаты Spark в Google Datproc? - PullRequest
0 голосов
/ 27 декабря 2018

Цель: миллионы строк в Cassandra должны быть извлечены и сжаты в один файл как можно быстрее и эффективнее (ежедневно).

В текущей настройке для запуска задания Spark используется кластер Google Dataproc, который извлекает данные непосредственно в корзину Google Cloud Storage.Я попробовал два подхода:

  1. Использование (в настоящее время не рекомендуется) FileUtil.copyMerge () для объединения примерно 9000 файлов разделов Spark в один несжатый файл, а затем отправка задания Hadoop MapReduceсжать этот единственный файл.

  2. Оставление примерно 9000 файлов разделов Spark в качестве необработанного вывода и отправка задания Hadoop MapReduce для объединения и сжатия этих файлов в один файл.

Некоторые сведения о работе: около 800 миллионов строк.Около 9000 файлов разделов Spark, выведенных заданием Spark.Работа Spark занимает около часа, чтобы завершить работу на кластере Dataproc с 1 главным, 4 рабочими (4 ВЦП, 15 ГБ каждый).Размер блока Dataproc Hadoop по умолчанию, который, я думаю, 128 МБ.

Некоторые сведения о конфигурации Spark:

spark.task.maxFailures=10
spark.executor.cores=4

spark.cassandra.input.consistency.level=LOCAL_ONE
spark.cassandra.input.reads_per_sec=100
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size_in_mb=64

Задание Hadoop:

hadoop jar file://usr/lib/hadoop-mapreduce/hadoop-streaming-2.8.4.jar
-Dmapred.reduce.tasks=1
-Dmapred.output.compress=true
-Dmapred.compress.map.output=true
-Dstream.map.output.field.separator=,
-Dmapred.textoutputformat.separator=,
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-input gs://bucket/with/either/single/uncompressed/csv/or/many/spark/partition/file/csvs
-output gs://output/bucket
-mapper /bin/cat
-reducer /bin/cat
-inputformat org.apache.hadoop.mapred.TextInputFormat
-outputformat org.apache.hadoop.mapred.TextOutputFormat
  1. Задание Spark заняло около 1 часа, чтобы извлечь данные Cassandra в корзину GCS.,Использование FileUtil.copyMerge (), добавленного примерно за 45 минут к этому, было выполнено кластером Dataproc, но использовала недостаточно ресурсов, так как кажется, что он использует 1 узел.Задание Hadoop для сжатия этого файла заняло дополнительно 50 минут.Это не оптимальный подход, так как кластер должен оставаться дольше, даже если он не использует все свои ресурсы.

Вывод информации с этого задания:

INFO mapreduce.Job: Counters: 55
File System Counters
    FILE: Number of bytes read=5072098452
    FILE: Number of bytes written=7896333915
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    GS: Number of bytes read=47132294405
    GS: Number of bytes written=2641672054
    GS: Number of read operations=0
    GS: Number of large read operations=0
    GS: Number of write operations=0
    HDFS: Number of bytes read=57024
    HDFS: Number of bytes written=0
    HDFS: Number of read operations=352
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=0
Job Counters 
    Killed map tasks=1
    Launched map tasks=353
    Launched reduce tasks=1
    Rack-local map tasks=353
    Total time spent by all maps in occupied slots (ms)=18495825
    Total time spent by all reduces in occupied slots (ms)=7412208
    Total time spent by all map tasks (ms)=6165275
    Total time spent by all reduce tasks (ms)=2470736
    Total vcore-milliseconds taken by all map tasks=6165275
    Total vcore-milliseconds taken by all reduce tasks=2470736
    Total megabyte-milliseconds taken by all map tasks=18939724800
    Total megabyte-milliseconds taken by all reduce tasks=7590100992
Map-Reduce Framework
    Map input records=775533855
    Map output records=775533855
    Map output bytes=47130856709
    Map output materialized bytes=2765069653
    Input split bytes=57024
    Combine input records=0
    Combine output records=0
    Reduce input groups=2539721
    Reduce shuffle bytes=2765069653
    Reduce input records=775533855
    Reduce output records=775533855
    Spilled Records=2204752220
    Shuffled Maps =352
    Failed Shuffles=0
    Merged Map outputs=352
    GC time elapsed (ms)=87201
    CPU time spent (ms)=7599340
    Physical memory (bytes) snapshot=204676702208
    Virtual memory (bytes) snapshot=1552881852416
    Total committed heap usage (bytes)=193017675776
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=47132294405
File Output Format Counters 
    Bytes Written=2641672054 
Я ожидал, что это будет работать так же хорошо или лучше, чем другой подход, но он работал намного хуже.Работа Spark осталась неизменной.Пропустив FileUtil.copyMerge () и прыгнув прямо в задание Hadoop MapReduce ... часть задания на карте составляла всего около 50% через полтора часа.Работа была отменена в тот момент, так как было ясно, что она не будет жизнеспособной.

У меня есть полный контроль над заданием Spark и заданием Hadoop.Я знаю, что мы могли бы создать больший кластер, но я бы предпочел сделать это только после того, как удостоверился, что сама работа оптимизирована.Любая помощь приветствуется.Спасибо.

1 Ответ

0 голосов
/ 03 января 2019

Можете ли вы предоставить более подробную информацию о вашей работе Spark?Какой API Spark вы используете - RDD или Dataframe?Почему бы не выполнить фазу слияния полностью в Spark (с repartition (). Write ()) и избежать цепочки заданий Spark и MR?

...