NodeJS: KafkaJSProtocolError: Поддерживаемые протоколы члена группы несовместимы с протоколами существующих членов. - PullRequest
0 голосов
/ 27 марта 2019

Я пытаюсь получить данные из Kafka с помощью коннектора debezium MongoDB, но получаю сообщение об ошибке при попытке прочитать их с KafkaJS:

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

Я использую докеры для захвата данных.

Вот шаги, я следую:

  1. Запуск Zookeeper

    docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
    
  2. начало кафки

    docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
    
  3. У меня MongoDB уже работает в режиме репликации

  4. Запуск дебезиума Кафка коннект

    docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka  debezium/connect:latest
    
  5. После публикации конфигурации разъема MongoDB

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
    
  6. При этом Если я запускаю контейнер докера-наблюдателя, я могу получать данные в формате Json на консоли

    docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
    

но я хочу захватить эти данные в приложении, чтобы я мог манипулировать ими, обрабатывать их и передавать в ElasticSearch. Для этого я использую

https://github.com/tulios/kafkajs 

Но когда я запускаю код потребителя, я получаю сообщение об ошибке. Вот пример кода

//'use strict';






// clientId=connect-1, groupId=1

const { Kafka } = require('kafkajs')



const kafka = new Kafka({

  clientId: 'connect-1',

  brokers: ['localhost:9092', 'localhost:9093']

})


// Consuming

const consumer = kafka.consumer({ groupId: '1' })



var consumeMessage = async () => {



await consumer.connect()

await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })



await consumer.run({

  eachMessage: async ({ topic, partition, message }) => {

    console.log({

      value: message.value.toString(),

    })

  },

})



}



consumeMessage();


KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

1 Ответ

0 голосов
/ 28 марта 2019

Вы не должны использовать один и тот же идентификатор группы в Connect и у вашего потребителя KafkaJS. Если вы это сделаете, они будут частью той же группы потребителей, что означает, что сообщения будут использоваться только одним или другим, если это вообще сработало.

Если вы измените groupId своего потребителя KafkaJS на что-то уникальное, оно должно работать.

Обратите внимание, что по умолчанию новая группа потребителей KafkaJS начнет потреблять с последнего смещения, поэтому она не будет потреблять уже созданные сообщения. Вы можете переопределить это поведение с помощью флага fromBeginning в вызове consumer.subscribe. Смотри https://kafka.js.org/docs/consuming#from-beginning

...