При использовании java API Kafka 1.0.0 jar-файлы представляют собой kafka_2.11-1.0.0-cp1.jar, kafka-clients-1.0.0-cp1.jar, zkclient-0.10.jar, с Confluent 4.1.0.
Мы написали и использовали потребителей, которые были стабильными в течение последних 2 лет, используя блоки кода, которые выглядят следующим образом:
val props = new Properties
props.put("zookeeper.connect", zookeepers)
props.put("schema.registry.url", schema_reg)
props.put("group.id", group_name)
props.put("consumer.id", app_name + "_lab_consumer")
props.put("consumer.timeout.ms", "5")
props.put("auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("auto.offset.reset", "smallest")
props.put("backoff.increment.ms", "5000")
val vProps : VerifiableProperties = new VerifiableProperties(props)
run_consumer = Consumer.create(new ConsumerConfig(props))
run_iterator = run_consumer.createMessageStreamsByFilter(
new Whitelist(lab_topic), 1,
new KafkaAvroDecoder(vProps),
new KafkaAvroDecoder(vProps)
)(0).iterator
while (keep_looping) {
if (run_iterator.hasNext) {
var record : kafka.message.MessageAndMetadata[Object, Object] = null
var key : String = null
var value : GenericRecord = null
record = run_iterator.next
key = record.key.toString
value = record.message.asInstanceOf[GenericRecord]
// do some stuff with 'value' etc
} // if
// do some stuff to decide if to set keep_looping=false
} // while
Но теперь наше требование увеличило скорость производства сообщений в несколько раз,Теперь темы должны обрабатывать до 500 сообщений в секунду, тогда как раньше у нас редко было более 50 сообщений в секунду.
Архитектура проста.Один производитель отправляет все сообщения в одну тему с 10 разделами.Однопоточный Потребитель читает все сообщения из темы.Это единственный потребитель в своей группе.
Мы также используем Confluent Kafka Elasticsearch-Connectors.Они образуют группу потребителей с несколькими экземплярами потребителей и не обнаруживают явных проблем при обработке увеличенной скорости передачи сообщений.
Но наш потребитель прекратит читать больше сообщений после переменного количества времени чтения всех сообщений натема.Затем он отключается и отключается, даже если тема активна и сообщения передаются через Elasticsearch-Connectors.
Мы заметили, что, когда наш Потребитель прекращает читать сообщения, создается впечатление, что тема перебалансирована.Но почему тема будет перебалансирована, если в группе только один Потребитель?(Возможно, из-за того, что Elasticsearch-Connector умирает, вызывая перебалансировку?) Перебалансировка кажется проблематичной и, по-видимому, влияет на Zookeeper способом, описанным в Потребитель Java Kafka зависает после сбоя перебалансировки , однако решение, предлагаемое дляэтот пост к нам не относится, поскольку у нас есть группа потребителей с одним потребителем.
Перебалансирование может продолжаться бесконечно, и нам нужно будет удалить тему и начать все сначала, прежде чем наш потребитель сможет подписаться на нее.снова.
Похоже ли это на ошибку в Кафке или сбой в нашем дизайне?Спасибо.