Kafka Consumer перестает читать сообщения, когда скорость их создания слишком высока - PullRequest
0 голосов
/ 08 июня 2019

При использовании java API Kafka 1.0.0 jar-файлы представляют собой kafka_2.11-1.0.0-cp1.jar, kafka-clients-1.0.0-cp1.jar, zkclient-0.10.jar, с Confluent 4.1.0.

Мы написали и использовали потребителей, которые были стабильными в течение последних 2 лет, используя блоки кода, которые выглядят следующим образом:

val props = new Properties
props.put("zookeeper.connect",       zookeepers)
props.put("schema.registry.url",     schema_reg)
props.put("group.id",                group_name)
props.put("consumer.id",             app_name + "_lab_consumer")
props.put("consumer.timeout.ms",     "5")
props.put("auto.commit",             "true")
props.put("auto.commit.interval.ms", "1000")
props.put("auto.offset.reset",       "smallest")
props.put("backoff.increment.ms",    "5000")

val vProps : VerifiableProperties = new VerifiableProperties(props)
run_consumer = Consumer.create(new ConsumerConfig(props))
run_iterator = run_consumer.createMessageStreamsByFilter(
                 new Whitelist(lab_topic), 1,
                 new KafkaAvroDecoder(vProps),
                 new KafkaAvroDecoder(vProps)
               )(0).iterator

while (keep_looping) {
  if (run_iterator.hasNext) {
    var record : kafka.message.MessageAndMetadata[Object, Object] = null 
    var key    : String = null 
    var value  : GenericRecord = null 

    record = run_iterator.next
    key    = record.key.toString
    value  = record.message.asInstanceOf[GenericRecord]

    // do some stuff with 'value' etc
  } // if
  // do some stuff to decide if to set keep_looping=false
} // while

Но теперь наше требование увеличило скорость производства сообщений в несколько раз,Теперь темы должны обрабатывать до 500 сообщений в секунду, тогда как раньше у нас редко было более 50 сообщений в секунду.

Архитектура проста.Один производитель отправляет все сообщения в одну тему с 10 разделами.Однопоточный Потребитель читает все сообщения из темы.Это единственный потребитель в своей группе.

Мы также используем Confluent Kafka Elasticsearch-Connectors.Они образуют группу потребителей с несколькими экземплярами потребителей и не обнаруживают явных проблем при обработке увеличенной скорости передачи сообщений.

Но наш потребитель прекратит читать больше сообщений после переменного количества времени чтения всех сообщений натема.Затем он отключается и отключается, даже если тема активна и сообщения передаются через Elasticsearch-Connectors.

Мы заметили, что, когда наш Потребитель прекращает читать сообщения, создается впечатление, что тема перебалансирована.Но почему тема будет перебалансирована, если в группе только один Потребитель?(Возможно, из-за того, что Elasticsearch-Connector умирает, вызывая перебалансировку?) Перебалансировка кажется проблематичной и, по-видимому, влияет на Zookeeper способом, описанным в Потребитель Java Kafka зависает после сбоя перебалансировки , однако решение, предлагаемое дляэтот пост к нам не относится, поскольку у нас есть группа потребителей с одним потребителем.

Перебалансирование может продолжаться бесконечно, и нам нужно будет удалить тему и начать все сначала, прежде чем наш потребитель сможет подписаться на нее.снова.

Похоже ли это на ошибку в Кафке или сбой в нашем дизайне?Спасибо.

...