Как установить IP-адрес Zookeeper в ConsumerGroup вместо IP-адреса Kafka-Host? - PullRequest
0 голосов
/ 31 мая 2019

Я должен установить IP-адрес zookeeper в ConsumerGroup вместо IP-адреса Kafka-Host. Потому что я установил коэффициент репликации, так как были созданы 3 и 3 брокера. Таким образом, если один из узлов выйдет из строя, тогда другой может вступить во владение.

Когда я попытался ввести IP-адрес zookeeper вместо IP-адреса Kafka-Host в ConsumerGroup, он не получает никаких сообщений, отправленных из API производителя.

var kafka = require('kafka-node')
var ConsumerGroup = kafka.ConsumerGroup

function createConsumerGroup () {
  var options = {
    kafkaHost: '127.0.0.1:9092',
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

  var consumerGroup = new ConsumerGroup(Object.assign({ id: 'demo-' + process.pid }, options), 'example')

  consumerGroup.on('message', function (message) {
    message.value = JSON.parse(message.value)
    console.log('Message Received')
  })
}

Я хочу, чтобы, если я передал IP-адрес zookeeper в ConsumerGroup, а не IP-адрес Kafka-Host, он должен получать сообщения, которые были отправлены из API производителя по теме «примера». И если один из брокеров дает сбой, он должен получать сообщения от другого брокера. Поскольку коэффициент репликации установлен на 3, было создано 3 брокера.

Ответы [ 2 ]

0 голосов
/ 01 июня 2019

Хорошо, проблема была в объекте опций consumerGroup.

Мы должны передать IP-адрес zookeeper в ключе "host" в объекте параметров вместо ключа "kafkaHost". Это решает проблему и получает все данные, отправленные API производителя. И даже автоматически переключаться на другой набор реплик в случае сбоя одного набора реплик.

var options = {
    kafkaHost: '127.0.0.1:9092',
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

Следующий блок кода исправит это.

var options = {
    host: '127.0.0.1:2181', // change in key & value
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

Спасибо.

0 голосов
/ 31 мая 2019

Новый потребительский API, представленный в Kafka 0.9, не требует соединения с Zookeeper. Групповая балансировка теперь выполняется самой Кафкой. Поэтому вы должны предоставить хосты Kafka вместо хостов Zookeeper.

Сообщение в блоге от Confluent должно пролить больше света:

Во время выпуска Apache Kafka версии 0.8.2, который выпустил переработан производитель клиента, мы обещали изменить дизайн потребителя клиент также. И мы сдержали свое обещание: выпуск 0.9 представляет бета-поддержка недавно переработанного потребительского клиента. На высоком уровень, основное отличие нового потребителя заключается в том, что он удаляет различие между «высокоуровневым» потребителем на основе ZooKeeper и «низкоуровневые» API SimpleConsumer, и вместо этого предлагает унифицированную потребительский API.

Этот новый потребитель реализован с использованием новой мощной серверной части. средство, которое делает групповое управление первоклассной частью Кафки протоколы. Это имеет несколько преимуществ. Во-первых, это позволяет более масштабируемое групповое средство, которое позволяет потребителям быть намного проще и тоньше и позволяет работать с большими группами восстановление равновесия. Это средство доступно для всех клиентов; работа является уже близится к завершению, чтобы использовать его в клиенте C, librdkafka. это одна и та же возможность оказывается в целом полезной для управления распределенными производство и использование данных в Кафке; это основа для Кафки Подключение, а также несколько предстоящих проектов. Наконец это завершает серия проектов, выполненных за последние несколько лет, чтобы полностью развязать Кафку клиенты из Zookeeper, таким образом, полностью удаляя клиентов клиента зависимость от ZooKeeper. Zookeeper все еще используется Kafka, но это детали реализации брокер-клиентов, которые используют этот новый У объекта нет необходимости подключаться к Zookeeper вообще. Это имеет ряд операционных преимуществ, так как клиенты теперь всегда работают через механизмы безопасности и квот, предоставляемые брокером. это значительно упрощает потребителю и открывает двери для появление первоклассных реализаций потребительского API не на Java со временем.

...