Итак, у меня проблема с Kafka Sinks в Spark Streaming при отправке JSON для нескольких тем и ненадежных брокеров kafka .Вот некоторые части кода:
val kS = KafkaUtils.createDirectStream[String, TMapRecord]
(ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT))
Затем я перебираю RDD
kSMapped.foreachRDD {
rdd: RDD[TMsg] => {
rdd.foreachPartition {
part => {
part.foreach { ...........
И внутри foreach я делаю
kafkaSink.value.send(kafkaTopic, strJSON)
kafkaSinkMirror.value.send(kafkaTopicMirrorBroker, strJSON)
Когда Зеркальный брокер не работает, все потоковое приложение ждет его, и мы ничего не отправляем главному брокеру.
Как бы вы справились с этим?
Для самого простого решения, которое вы предлагаете, представьтечтобы я просто пропустил сообщения, которые должны были быть отправлены брокеру, который вышел из строя (скажем, это CASE 1)
для CASE 2 мы бы сделали некоторую буферизацию.
PS ПозжеЯ буду использовать Kafka Mirror, но в настоящее время у меня нет такой опции, поэтому мне нужно найти какое-то решение в моем коде.