Цель: миллионы строк в Cassandra должны быть извлечены и сжаты в один файл как можно быстрее и эффективнее (ежедневно).
В текущей настройке для запуска задания Spark используется кластер Google Dataproc, который извлекает данные непосредственно в корзину Google Cloud Storage.Я попробовал два подхода:
Использование (в настоящее время не рекомендуется) FileUtil.copyMerge () для объединения примерно 9000 файлов разделов Spark в один несжатый файл, а затем отправка задания Hadoop MapReduceсжать этот единственный файл.
Оставление примерно 9000 файлов разделов Spark в качестве необработанного вывода и отправка задания Hadoop MapReduce для объединения и сжатия этих файлов в один файл.
Некоторые сведения о работе: около 800 миллионов строк.Около 9000 файлов разделов Spark, выведенных заданием Spark.Работа Spark занимает около часа, чтобы завершить работу на кластере Dataproc с 1 главным, 4 рабочими (4 ВЦП, 15 ГБ каждый).Размер блока Dataproc Hadoop по умолчанию, который, я думаю, 128 МБ.
Некоторые сведения о конфигурации Spark:
spark.task.maxFailures=10
spark.executor.cores=4
spark.cassandra.input.consistency.level=LOCAL_ONE
spark.cassandra.input.reads_per_sec=100
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size_in_mb=64
Задание Hadoop:
hadoop jar file://usr/lib/hadoop-mapreduce/hadoop-streaming-2.8.4.jar
-Dmapred.reduce.tasks=1
-Dmapred.output.compress=true
-Dmapred.compress.map.output=true
-Dstream.map.output.field.separator=,
-Dmapred.textoutputformat.separator=,
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-input gs://bucket/with/either/single/uncompressed/csv/or/many/spark/partition/file/csvs
-output gs://output/bucket
-mapper /bin/cat
-reducer /bin/cat
-inputformat org.apache.hadoop.mapred.TextInputFormat
-outputformat org.apache.hadoop.mapred.TextOutputFormat
- Задание Spark заняло около 1 часа, чтобы извлечь данные Cassandra в корзину GCS.,Использование FileUtil.copyMerge (), добавленного примерно за 45 минут к этому, было выполнено кластером Dataproc, но использовала недостаточно ресурсов, так как кажется, что он использует 1 узел.Задание Hadoop для сжатия этого файла заняло дополнительно 50 минут.Это не оптимальный подход, так как кластер должен оставаться дольше, даже если он не использует все свои ресурсы.
Вывод информации с этого задания:
INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=5072098452
FILE: Number of bytes written=7896333915
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
GS: Number of bytes read=47132294405
GS: Number of bytes written=2641672054
GS: Number of read operations=0
GS: Number of large read operations=0
GS: Number of write operations=0
HDFS: Number of bytes read=57024
HDFS: Number of bytes written=0
HDFS: Number of read operations=352
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Killed map tasks=1
Launched map tasks=353
Launched reduce tasks=1
Rack-local map tasks=353
Total time spent by all maps in occupied slots (ms)=18495825
Total time spent by all reduces in occupied slots (ms)=7412208
Total time spent by all map tasks (ms)=6165275
Total time spent by all reduce tasks (ms)=2470736
Total vcore-milliseconds taken by all map tasks=6165275
Total vcore-milliseconds taken by all reduce tasks=2470736
Total megabyte-milliseconds taken by all map tasks=18939724800
Total megabyte-milliseconds taken by all reduce tasks=7590100992
Map-Reduce Framework
Map input records=775533855
Map output records=775533855
Map output bytes=47130856709
Map output materialized bytes=2765069653
Input split bytes=57024
Combine input records=0
Combine output records=0
Reduce input groups=2539721
Reduce shuffle bytes=2765069653
Reduce input records=775533855
Reduce output records=775533855
Spilled Records=2204752220
Shuffled Maps =352
Failed Shuffles=0
Merged Map outputs=352
GC time elapsed (ms)=87201
CPU time spent (ms)=7599340
Physical memory (bytes) snapshot=204676702208
Virtual memory (bytes) snapshot=1552881852416
Total committed heap usage (bytes)=193017675776
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=47132294405
File Output Format Counters
Bytes Written=2641672054
Я ожидал, что это будет работать так же хорошо или лучше, чем другой подход, но он работал намного хуже.Работа Spark осталась неизменной.Пропустив FileUtil.copyMerge () и прыгнув прямо в задание Hadoop MapReduce ... часть задания на карте составляла всего около 50% через полтора часа.Работа была отменена в тот момент, так как было ясно, что она не будет жизнеспособной.
У меня есть полный контроль над заданием Spark и заданием Hadoop.Я знаю, что мы могли бы создать больший кластер, но я бы предпочел сделать это только после того, как удостоверился, что сама работа оптимизирована.Любая помощь приветствуется.Спасибо.