Apache Storm с использованием Kafka Spout выдает ошибку: IllegalStateException - PullRequest
0 голосов
/ 03 декабря 2018
Version Info: 
   "org.apache.storm" % "storm-core" % "1.2.1" 
   "org.apache.storm" % "storm-kafka-client" % "1.2.1" 

У меня есть топология шторма, которая выглядит следующим образом:

boltA -> boltB -> boltC -> boltD

boltA просто делает некоторыеформатирование запросов и выдача другого кортежа.boltB выполняет некоторую обработку и генерирует около 100 кортежей для каждого принятого кортежа.boltC и boltD обрабатывают эти кортежи.Все болты реализуют BaseBasicBolt.

. Я замечаю, что когда boltD помечает некоторый tuple как сбой и помечает как повтор, выбрасывая FailedException, через несколько минут меньше, чем истекло время моей топологии,Я получаю следующую ошибку:

2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

Похоже, что это происходит, когда boltB испускает 100 из 1 кортежа, а boltD не может выполнить один из кортежей из этих 100 кортежей.получаю эту ошибку.Не в состоянии понять, как это исправить, в идеале он должен ack исходный кортеж, когда все 100 кортежей acked, но, вероятно, оригинальный кортеж равен acked до того, как все эти 100 кортежей acked, что вызывает эту ошибку.

Редактировать:

Я могу воспроизвести это с использованием следующей топологии с двумя болтами. Это воспроизводится примерно через 5 минут работы в кластерном режиме:

БолтA

case class Abc(index: Int, rand: Boolean)

class BoltA  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
    val inp = input.getBinaryByField("value").getObj[someObj]
    val randomGenerator = new Random()

    var i = 0
    val rand = randomGenerator.nextBoolean()
    1 to 100 foreach {
      collector.emit(new Values(Abc(i, rand).getJsonBytes))
      i += 1
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("boltAout"))
  }

}

БолтB

class BoltB  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
    val abc = input.getBinaryByField("boltAout").getObj[Abc]
    println(s"Received ${abc.index}th tuple in BoltB")
    if(abc.index >= 97 && abc.rand){
      println(s"throwing FailedException for ${abc.index}th tuple for")
      throw new FailedException()
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  }
}

КафкаВыход:

private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)
    .setRetry(new KafkaSpoutRetryExponentialBackoff(
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      10,
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
    ))
    .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMITTED_EARLIEST")))
    .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", 10000))
    .build()

Другая конфигурация:

messageTimeoutInSecons: 300

enter image description here

1 Ответ

0 голосов
/ 22 декабря 2018

Исправление было предоставлено @ Стигом Роде Доссингом здесь .Точная причина проблемы была описана здесь как показано ниже:

В исправлении для STORM-2666 и последующих действий мы добавили логику для обработки случаев, когда носик получил подтверждениедля смещения после того, как следующие смещения уже были подтверждены.Проблема заключалась в том, что носик мог зафиксировать все подтвержденные смещения, но не скорректировать положение потребителя вперед или не очистить waitToEmit должным образом.Если зафиксированное смещение было достаточно далеко за смещением конца журнала, то излив может закончить опрос для смещений, которые он уже зафиксировал.

Исправление немного неправильное.Когда потребительская позиция опускается после зафиксированного смещения, мы обязательно корректируем позицию вперед и очищаем все сообщения waitToEmit, которые находятся за зафиксированным смещением.Мы не очищаем waitToEmit, пока не настроим позицию потребителя, что, как оказывается, является проблемой.

Например, допустим, что смещение 1 не выполнено, смещения 2-10 подтверждены, а maxPollRecords равно 10. Скажемв Кафке 11 записей (1-11).Если носик возвращается к смещению 1, чтобы воспроизвести его, он получит смещения 1-10 от потребителя в опросе.Потребительская позиция теперь равна 11. Излив излучает смещение 1. Скажем, оно немедленно подтверждено.При следующем опросе носик примет смещение 1-10 и проверит, следует ли ему корректировать позицию потребителя и waitToEmit.Поскольку позиция (11) опережает зафиксированное смещение (10), оно не очищает waitToEmit.Так как waitToEmit по-прежнему содержит смещения 2-10 из предыдущего опроса, то носик в конечном итоге снова выпустит эти кортежи.

Здесь можно увидеть исправление .

...