Spark 2.4 для Elasticsearch: предотвратить потерю данных при выводе из эксплуатации узлов datapro c? - PullRequest
2 голосов
/ 21 января 2020

Моя техническая задача - синхронизировать данные из GCS (Google Cloud Storage) с нашим кластером Elasticsearch.

Мы используем Apache Spark 2.4 с разъемом Elasti c Had oop в Google Datapro c кластер (автоматическое масштабирование включено).

Во время выполнения, если кластер Datapro c уменьшен в размере, все задачи на выведенном из эксплуатации узле будут потеряны, а обработанные данные на этом узле никогда не будут отправлены на elasti c* трассировка стека:

Потерянное задание 50.0 на этапе 2.3 (TID 427, xxxxxxx-sw-vrb7. c .xxxxxxx, исполнитель 43): FetchFailed (BlockManagerId (30, xxxxxxx-w-23). apache .xxxxxxx: 7337

Причина: java. net .Unkn ownHostException: xxxxxxx-w-23. c .xxxxxxx

Задача 50.0 на этапе 2.3 (TID 427) завершилась неудачно, но задача не будет выполнена повторно (либо потому, что задача не удалась при случайном получении данных) сбой, поэтому предыдущий этап необходимо повторно запустить или потому, что другой экземпляр задачи уже выполнен).

Спасибо. Фред

1 Ответ

0 голосов
/ 28 января 2020

Я go расскажу немного о "проблеме уменьшения масштаба" и о том, как ее смягчить. Обратите внимание, что эта информация относится как к уменьшению масштаба вручную, так и к вытесняемым виртуальным машинам с прерыванием.

Фон

Автоматическое масштабирование удаляет узлы в зависимости от объема «доступной» памяти YARN в кластере. Он не учитывает случайные данные в кластере. Вот иллюстрация из недавней презентации, которую мы дали.

В задании в стиле MapReduce (задания Spark представляют собой набор перемешиваний в стиле MapReduce между этапами), данные из всех картографических устройств должны быть получены в все редукторы. Мапперы записывают свои данные в случайном порядке на локальный диск, а затем редукторы извлекают данные из каждого маппера. На каждом узле есть сервер, предназначенный для обслуживания случайных данных, и он работает за пределами YARN. Таким образом, узел может казаться простаивающим в YARN, даже если ему необходимо оставаться рядом, чтобы обслуживать свои данные в случайном порядке.

normal shuffle

Когда удаляется один узел, почти все редукторы потерпят неудачу, так как все они должны получать данные с каждого узла. Редукторы, в частности, потерпят неудачу с FetchFailedException (как вы видели), указывая на то, что они не смогли получить данные случайного типа от определенного узла. Драйвер в конечном итоге повторно запустит необходимые преобразователи, а затем повторно запустит этап сокращения. Spark немного неэффективен (https://issues.apache.org/jira/browse/SPARK-20178), но работает.

shuffle when a node is removed

Обратите внимание, что вы можете потерять узлы в одном из трех сценариев ios:

  1. Преднамеренное удаление узлов (автоматическое масштабирование или ручное масштабирование)
  2. Вытесняемые виртуальные машины прерваны. Вытесняемые виртуальные машины вытесняются по крайней мере каждые 24 часа.
  3. (относительно редко) стандартная виртуальная машина GCE незаметно завершается GCE и перезапускается. Обычно стандартные виртуальные машины прозрачно live migrated .

При создании кластера автоматического масштабирования Datapro c добавляет несколько свойств для повышения устойчивости работы в грань потерянных узлов:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10
spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Обратите внимание, что если вы включите автоматическое масштабирование в существующем кластере, эти свойства не будут установлены. (Но вы можете установить их вручную при создании кластера.)

Смягчения

1) Использовать изящное списание

Datapro c интегрируется с Изящное снятие с эксплуатации YARN , и может быть задано для политик автоматического масштабирования или операций ручного уменьшения масштаба.

При постепенном выводе узла из эксплуатации YARN сохраняет его до тех пор, пока приложения, которые запускали контейнеры на узле, не закончили sh, но не позволяли ему работать новые контейнеры. Это дает узлам возможность обслуживать их данные в случайном порядке перед удалением.

Вам нужно будет убедиться, что время изящного вывода из эксплуатации достаточно велико, чтобы охватить ваши самые длинные задания. Документы для автоматического масштабирования предлагают 1h в качестве отправной точки.

Обратите внимание, что постепенное снятие с эксплуатации действительно имеет смысл только для долгосрочных кластеров , которые обрабатывают множество коротких заданий .

На эфемерных кластерах было бы лучше "правильно выбрать размер" кластера с самого начала или отключить масштабирование, если кластер не находится в режиме ожидания (установите scaleDownMinWorkerFraction=1.0).

2) Избегайте приоритетных виртуальных машин

Даже при использовании постепенного вывода из эксплуатации приоритетные виртуальные машины будут периодически прерываться с помощью «приоритетных операций». GCE гарантирует, что вытесняемые виртуальные машины будут вытеснены в течение 24 часов, а вытеснения в больших кластерах очень распространены.

Если вы используете постепенный вывод из эксплуатации, а сообщения об ошибках FetchFailedException включают -sw-, вы, вероятно, видите извлекать сбои из-за вытеснения узлов.

У вас есть два варианта, чтобы избежать использования вытесняемых виртуальных машин: 1. В вашей политике автоматического масштабирования вы можете установить secondaryWorkerConfig, чтобы иметь 0 минут и максимум экземпляров, и вместо этого поместить всех рабочих в основной группе. 2. В качестве альтернативы, вы можете продолжать использовать «вторичных» работников, но установить --properties dataproc:secondary-workers.is-preemptible.override=false. Это сделает ваши вторичные работники стандартными виртуальными машинами.

3) Долгосрочный: расширенный режим гибкости

Datapro c * Расширенный режим гибкости - это долгосрочный ответ на проблему тасования.

Уменьшение масштаба проблема вызвана случайным хранением данных на локальном диске. EFM будет включать в себя новые реализации в случайном порядке, которые позволяют размещать данные в произвольном порядке на фиксированном наборе узлов (например, только первичные рабочие) или в хранилище вне кластера.

Это сделает вторичных рабочих не имеющими состояния Это означает, что они могут быть удалены в любое время. Это делает автоматическое масштабирование гораздо более привлекательным.

В настоящий момент EFM все еще находится в Alpha и не масштабируется до реальных рабочих нагрузок, но к лету ожидает готовую к выпуску бета-версию.

...