Лучший способ проверить, не зависают ли потоковые задания Spark - PullRequest
0 голосов
/ 17 октября 2018

У меня есть потоковое приложение Spark, которое в основном получает триггерное сообщение от Kafka, которое запускает пакетную обработку, которая потенциально может занять до 2 часов.

Были случаи, когда некоторые задания зависали на неопределенный срок и не выполнялись.не выполняются в обычное время, и в настоящее время мы не можем выяснить состояние задания, не проверяя интерфейс Spark вручную.Я хочу иметь способ, где текущие рабочие места искры зависают или нет.Так что, в основном, если он зависает более 30 минут, я хочу уведомить пользователей, чтобы они могли принять меры.Какие у меня есть все варианты?

Я вижу, что могу использовать метрики от водителя и исполнителей.Если бы я выбрал самый важный, это были бы последние полученные записи партии.Когда StreamingMetrics.streaming.lastReceivedBatch_records == 0 это, вероятно, означает, что потоковое задание Spark было остановлено или не выполнено.

Но в моем сценарии я получу только 1 событие запуска потоковой передачи, а затем начнется обработка, которая может занять до 2часов, поэтому я не смогу положиться на полученные записи.

Есть ли лучший способ?TIA

Ответы [ 7 ]

0 голосов
/ 18 июля 2019

YARN предоставляет API REST для проверки состояния приложения и состояния использования ресурсов кластера.

при вызове API выдаст список запущенных приложений, время их запуска и другие сведения.у вас может быть простой REST-клиент, который запускается один раз в 30 минут или около того, и проверьте, выполняется ли задание более 2 часов, а затем отправьте простое почтовое оповещение.

Вот документация по API:

https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Applications_API

0 голосов
/ 19 июля 2019

Позвольте мне обратить ваше внимание на слушателей Streaming Query.Это довольно удивительные легкие вещи, которые могут отслеживать ход ваших потоковых запросов.

В приложении с несколькими запросами вы можете выяснить, какие запросы отстают или остановлены из-за какого-то исключения.

Пожалуйста, найдите ниже пример кода, чтобы понять его реализацию.Я надеюсь, что вы можете использовать это и преобразовать этот кусок, чтобы лучше соответствовать вашим потребностям.Спасибо!

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(event: QueryStartedEvent) {
      //logger message to show that the query has started
    }
    override def onQueryProgress(event: QueryProgressEvent) {
      synchronized {
        if(event.progress.name.equalsIgnoreCase("QueryName"))
        {
        recordsReadCount = recordsReadCount + event.progress.numInputRows
        //Logger messages to show continuous progress
        }
      }
    }
    override def onQueryTerminated(event: QueryTerminatedEvent) {
      synchronized {
        //logger message to show the reason of termination.
      }
    }
  })
0 голосов
/ 17 июля 2019

Использование TaskContext

Предоставляет контекстную информацию для задачи и поддерживает добавление прослушивателей для завершения / сбоя задачи (см. AddTaskCompletionListener).

Также доступна более подробная информация, такая как задача «tryNumber» или «taskMetrics».

Эта информация может использоваться вашим приложением во время выполнения, чтобы определить, является ли оно «зависшим» (в зависимости от проблемы).)

Дополнительная информация о том, что означает «зависание», будет полезна для предоставления более конкретного решения.

0 голосов
/ 16 июля 2019

Возможно, простое решение, подобное.

В начале обработки - запустить ожидающий поток.

val TWO_HOURS = 2 * 60 * 60 * 1000

val t = new Thread(new Runnable {
  override def run(): Unit = {
    try {
      Thread.sleep(TWO_HOURS)
      // send an email that job didn't end
    } catch {
      case _: Exception => _
    }
  }
})

И в том месте, где можно сказать, что пакетная обработка закончена

t.interrupt()

Если обработка выполняется в течение 2 часов - нить официанта прерывается и электронное письмо не отправляется.Если обработка не завершена - электронное письмо будет отправлено.

0 голосов
/ 14 июля 2019

Один из способов - контролировать вывод задания зажигания , которое было запущено.Как правило, например,

  • Если он выполняет запись в HDFS, отслеживает выходной каталог HDFS для последнего измененного временного метки файла или сгенерированного количества файлов
  • Если он выполняет запись в базу данных, вы можете иметьзапрос для проверки отметки времени последней записи, вставленной в вашу таблицу вывода задания.
  • Если она выполняет запись в Kafka, вы можете использовать Kafka GetOffsetShell , чтобы получить текущее смещение выходной темы.
0 голосов
/ 13 июля 2019

Я использую Kubernetes в настоящее время с оператором Google Spark.[1]

Некоторые из моих потоковых заданий зависают при использовании Spark 2.4.3: несколько задач не выполняются, тогда текущее пакетное задание не выполняется.

Я установил таймаут с помощью StreamingProgressListenerтак что поток сигнализирует о том, что новая партия не будет отправлена ​​в течение длительного времени.Затем сигнал передается клиенту Pushover, который отправляет уведомление на устройство Android.Тогда System.exit(1) называется.Оператор Spark в конечном итоге перезапустит задание.

[1] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

0 голосов
/ 13 июля 2019

У меня был похожий сценарий, с которым я столкнулся около года назад, и вот что я сделал -

Как только Kafka получит сообщение, spark streaming задание подхватит событие и начнет обработку.

Spark streaming задание отправляет предупреждение email на Support group с сообщением "Событие получено и преобразование искры STARTED".Start timestamp сохраняется.

После завершения обработки / преобразования искры - отправляет предупреждение email в группу поддержки со словами «Преобразование искры ENDED Успешно».End timestamp сохранено.

Приведенные выше 2 шага помогут группе поддержки отследить, не было ли получено электронное письмо об успешной обработке искры, после того, как оно началось, и они могут провести расследование, просмотрев пользовательский интерфейс искры на предмет сбоя задания или отложенной обработки (возможно, заданиязависает из-за недоступности ресурса в течение длительного времени)

Наконец - сохраните идентификатор события или детали в файле HDFS вместе с отметкой времени начала и окончания.И сохраните этот файл по пути HDFS, на который указывает какой-то куст log_table.Это будет полезно для дальнейшего ознакомления с тем, как работает искровой код в течение определенного периода времени, и может быть fine tuned, если требуется.

Надеюсь, это полезно.

...