Независимые искры Kafka Spark (несколько производителей и брокеров) - PullRequest
0 голосов
/ 07 июня 2018

Итак, у меня проблема с Kafka Sinks в Spark Streaming при отправке JSON для нескольких тем и ненадежных брокеров kafka .Вот некоторые части кода:

val kS = KafkaUtils.createDirectStream[String, TMapRecord]
(ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT))

Затем я перебираю RDD

kSMapped.foreachRDD {
  rdd: RDD[TMsg] => {
    rdd.foreachPartition {
      part => {
        part.foreach { ........... 

И внутри foreach я делаю

kafkaSink.value.send(kafkaTopic, strJSON)

kafkaSinkMirror.value.send(kafkaTopicMirrorBroker, strJSON)

Когда Зеркальный брокер не работает, все потоковое приложение ждет его, и мы ничего не отправляем главному брокеру.

Как бы вы справились с этим?

Для самого простого решения, которое вы предлагаете, представьтечтобы я просто пропустил сообщения, которые должны были быть отправлены брокеру, который вышел из строя (скажем, это CASE 1)

для CASE 2 мы бы сделали некоторую буферизацию.

PS ПозжеЯ буду использовать Kafka Mirror, но в настоящее время у меня нет такой опции, поэтому мне нужно найти какое-то решение в моем коде.

1 Ответ

0 голосов
/ 08 июня 2018

Я нашел несколько решений этой проблемы:

  1. Вы можете использовать выбрасывание любого исключения тайм-аута для работника и контрольных точек .Spark несколько раз пытается перезапустить плохую задачу, описанную в свойстве spark.task.maxFailures.Можно увеличить количество повторных попыток.Если потоковое задание не выполняется после максимального числа повторных попыток, просто перезапустите задание с контрольной точки, когда брокер станет доступен.Или вы можете вручную остановить работу, если она не будет выполнена .
  2. Вы можете настроить противодавление spark.streaming.backpressure.enabled=true, которое позволит получать данные только с такой скоростью, с которой они могут их обработать.
  3. Вы можете отправить вам два результата обратно в техническую тему Kafka и обработать их позже с помощью другого потокового задания.
  4. Вы можете создать буфер Hive или Hbase для этих случаев и отправить необработанные данные позже в пакете.режим.
...