Почему Spark обработал одни и те же данные дважды? - PullRequest
0 голосов
/ 28 июня 2018

Я попал в странную ситуацию, когда простейшее из возможных искровых приложений, казалось бы, дважды выполняло одну и ту же работу.

Что я сделал

Приложение само выполняет запрос:

SELECT date, field1, field2, ..., field10
FROM table1
WHERE field1 = <some number>
  AND date BETWEEN date('2018-05-01') AND date('2018-05-30')
  ORDER BY 1

и сохраняет результаты в HDFS.

Таблица table1 - это набор файлов паркета, хранящихся в HDFS и разделенных следующим образом

/root/date=2018-05-01/hour=0/data-1.snappy.parquet
/root/date=2018-05-01/hour=0/data-2.snappy.parquet
...
/root/date=2018-05-01/hour=1/data-1.snappy.parquet
...
/root/date=2018-05-02/hour=0/data-1.snappy.parquet
...
etc.

Все файлы паркета имеют размер от 700M до 2G и имеют одинаковую схему: 10 ненулевых полей типов int или bigint.

Результат приложения крошечный - всего несколько тысяч строк.

Мое искровое приложение работало на YARN в кластерном режиме. Базовые параметры искры были

spark.driver.memory=2g
spark.executor.memory=4g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.submit.deployMode=cluster

Во время выполнения пара контейнеров была выгружена, ошибок и сбоев не было. Вся заявка завершена за одну попытку.

Странная вещь

Скриншоты из Spark UI:

  • main screen
  • stage 2
  • stage 4

Как можно видеть, этапы 2 и 4 обрабатывали одинаковое количество входных строк, но этап 4 также выполнял некоторую перестановку (это были строки результата). Неудачные задачи - это те, контейнеры которых были выгружены.

Похоже, мое приложение обрабатывало одни и те же файлы дважды.

Понятия не имею, как это возможно и что случилось. Пожалуйста, помогите мне понять, почему Spark делает такие странные вещи.

Фактический физический план:

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://hadoop/root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB, false, CSV, Map(path -> /root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB), Overwrite, [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- Coalesce 16
   +- *(2) Sort [date#10 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(date#10 ASC NULLS FIRST, 200)
         +- *(1) Project [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
            +- *(1) Filter (isnotnull(field1#1L) && (field1#1L = 1234567890))
               +- *(1) FileScan parquet default.table1[field0#0L,field1#1L,field2#2L,field3#3L,field4#4,field5#5,field6#6L,field7#7,date#10,hour#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hadoop/table1], PartitionCount: 714, PartitionFilters: [(date#10 >= 17652), (date#10 <= 17682)], PushedFilters: [IsNotNull(field1), EqualTo(field1,1234567890)], ReadSchema: struct<field0:bigint,field1:bigint,field2:bigint,field3:bigint,field4:int,field5:int,field6:bigint,field7:...

Вот DAG для этапов 2 и 4:

  • stage 2
  • stage 4

Ответы [ 2 ]

0 голосов
/ 26 мая 2019

Я столкнулся с точно такой же проблемой, и оказалось, что это совершенно нормальное поведение.

Я наблюдал такое поведение в задании Spark, которое просто читает из HDFS, выполняет некоторую облегченную обработку и использует метод orderBy для сортировки столбца перед обратной записью в HDFS. В пользовательском интерфейсе Spark я видел две работы, которые сканировали бы всю таблицу 6 ТБ, так же как и вы. Первая работа занимала очень мало памяти, не записывала записи в случайном порядке и не записывала записи в HDFS.

Оказывается, основная причина заключается в том, что перед фактической сортировкой данных Spark выполняет операцию выборки, которая помогает ему определить RangePartitioner, который он использует для разделения данных для своего алгоритма сортировки: Ему необходимо знать примерный диапазон данных в столбце, который определяет ключ сортировки для определения товара RangePartitioner.

Эта операция упоминается в этом блоге:

https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/

это сообщение StackOverflow:

Как Spark достигает порядка сортировки?

, а также в великой книге Холдена Карау и Рэйчел Уорран " High Performance Spark " на стр. 143.

В моем случае я знаю диапазон клавиш, поэтому мне пришло в голову, что я в принципе должен быть в состоянии определить RangePartitioner a priori . Тем не менее, я копался в исходном коде Spark для его метода sort, но я не нашел никакого обходного пути, где я мог бы явно передать диапазон.

0 голосов
/ 29 июня 2018

Я все еще не уверен, почему искра ведет себя так, и я все еще копаю, но мне удалось получить , что происходит.

Примечание: мой SQL заканчивается на ORDER. Поскольку ожидается, что задание будет возвращать очень мало строк, я подумал, что окончательная сортировка должна быть простой задачей.

Итак, когда я удаляю предложение ORDER, мой запрос выполняется должным образом и считывает паркет только один раз. Это странное поведение воспроизводимо независимо от того, насколько велик набор данных и сколько раз задачи прерываются во время выполнения: добавление условия order заставляет искру сканировать дважды весь набор данных (по крайней мере, так выглядит).

Я забыл упомянуть: я использую spark 2.3.0 из дистрибутива Hortonworks (HDP-2.6.5).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...