Strimzi - подключение внешних клиентов - PullRequest
1 голос
/ 05 октября 2019

Следуя обсуждению здесь , я использовал следующие шаги для включения внешнего клиента (на основе kafkajs ) для подключения к Strimzi в OpenShift. Эти шаги начинаются с здесь .

Включить внешний маршрут

kafka-persistent-single.yaml редактируется так, как показано ниже.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.3.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
          type: route
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.3"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Извлечь сертификат,

Чтобы извлечь сертификат и использовать его в клиенте, я выполнил следующую команду:

kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt

Обратите внимание, что мне пришлось использовать base64 -D в моей macOS, а не base64 -d, как показанов документации.

клиент Kafkajs

Это клиент, адаптированный со страницы npm и его документации .

const fs = require('fs')
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
  ssl : { rejectUnauthorized: false,
    ca : [fs.readFileSync('ca.crt', 'utf-8')]
  }
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

Вопрос

Когда я запускаю node sample.js из папки с ca.crt, я получаю сообщение об отказе в соединении.

{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}

Чего мне не хватает?

Ответы [ 2 ]

0 голосов
/ 07 октября 2019

После расширенного обсуждения с @ppatierno я чувствую, что кластер Strimzi хорошо работает с консольными клиентами Kafka. Пакет kafkajs, с другой стороны, продолжает сбой с NOT_LEADER_FOR_PARTITION.

ОБНОВЛЕНИЕ Клиент Python *1007*, похоже, работает без суеты;Итак, я оставляю kafkajs.

0 голосов
/ 05 октября 2019

Полагаю, проблема в том, что вам не хватает нужного порта 443 в адресе брокера, поэтому вам нужно использовать

брокеров: ['my-cluster-kafka-bootstrap-messaging-os.192.168. 99.100.nip.io:443']

в противном случае он пытается подключиться к порту 80 по умолчанию на маршруте OpenShift.

...