Получена неправильная запись для GROUP TOPIC PARTITION даже после попытки смещения OFFSET - PullRequest
0 голосов
/ 25 июня 2018

Я использую Spark Streaming, и внезапно я получаю это сообщение при попытке потоковой передачи темы. Как я могу пропустить эту ошибку?

Caused by: java.lang.AssertionError: assertion failed: Got wrong record for GROUP TOPIC 109 even after seeking to offset 754809
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:90)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:222)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:988)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:697)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

1 Ответ

0 голосов
/ 21 декабря 2018

Это не фактический ответ, но он не уместился в комментарии. Кроме того, это не исправление, просто обойти.

Spark также удерживает смещения и проверяет целостность при использовании сообщений. Редко случается, что состояние смещения, поддерживаемое в Spark Streaming API, не совпадает с тем, что имеет Kafka. Вы можете проверить целостность смещений:

kafka-simple-consumer-shell --broker-list BROKER:9092 --clientId GROUP_ID --offset 752000 --print-offsets --max-messages 1000 --topic TOPIC | grep offset

Здесь 752000 - закрытие смещения, но перед неудачным вы видите исключение.

Вы можете перебрать вывод и посмотреть, в порядке ли смещения в Кафке.

Однако в нашем случае смещения в Кафке были просто хорошими. У нас был сбой в Кафке, и нам пришлось восстанавливать, восстанавливая журналы. Таким образом, подход, который мы использовали, заключался в том, чтобы просто пропустить смещения до точки, когда состояние Spark Streaming совпадает с Kafka.

Для этого мы использовали инструмент kt как

kt group -brokers BROKER:9092 -topic TOPIC -group GROUP_ID -partitions 113 -reset 753000

Здесь в разделе 113 есть проблема смещения (вы можете найти его из исключения), а 753000 - это возможное смещение, которое, как вы догадываетесь, должно быть в дальнейшем. Иногда вам нужно повторить процесс и перезапустить работу, чтобы прийти к выводу, что все в порядке.

Этот процесс полностью экспериментальный, потому что в сообщении не указано, какое смещение отсутствует. Следовательно, исходя из вашего требования, сколько данных потерять, все в порядке, вы можете выбрать число до или после смещения, указанного в журнале. Например, если в сообщении журнала напечатано смещение 752900, вы можете пропустить ошибку, установив ее на 752800 (ошибочное смещение раньше) или установить более раннюю, например 752950. В последнем случае пропускается 50 сообщений.

...