В целях тестирования я пытаюсь создать кластер Kafka на локальном мини-кубе. Кластер должен быть доступен снаружи Кубернетов.
Когда я производю / потребляю из капсул, проблем нет, все работает просто отлично.
Когда я делаю из своей локальной машины с
bin/kafka-console-producer.sh --topic mytopic --broker-list 192.168.99.100:32767
, где 192.168.99.100 - это мой minikube-ip, а 32767 - это порт узла службы kafka.
Я получаю следующее сообщение об ошибке:
>testmessage
>[2018-04-30 11:55:04,604] ERROR Error when sending message to topic ams_stream with key: null, value: 11 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ams_stream-0: 1506 ms has passed since batch creation plus linger time
Когда я использую свой локальный компьютер, я получаю следующие предупреждения:
[2018-04-30 10:22:30,680] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-04-30 10:23:46,057] WARN Connection to node 8 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-04-30 10:25:01,542] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-04-30 10:26:17,008] WARN Connection to node 5 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Идентификаторы брокера верны, поэтому, похоже, я могу хотя бы связаться с брокерами
Edit:
Я думаю, что проблема может заключаться в том, что служба направляет меня «случайным образом» любому из моих брокеров, но он должен направить меня к руководителю темы.
Может ли это быть проблемой? Кто-нибудь знает способ обойти эту проблему?
Дополнительная информация:
Я использую wurstmeister / kafka и digitalwonderland / zookeeper images
Я начал использовать Учебное пособие по DellEMC (и связанное с defuze.org )
Это не сработало для меня, поэтому я внес некоторые изменения в kafka-service.yml (1) и kafka-cluster.yml (2)
Кафка-service.yml
- добавлен фиксированный NodePort
- удален идентификатор из селектора
Кафка-cluster.yml
- добавлены реплики к спецификации
- удален идентификатор с метки
- изменил идентификатор брокера для генерации по последнему номеру с IP-адреса
- заменил устаревшие значения advertised_host_name / advertised_port на
- слушателей (pod-ip: 9092) для связи внутри k8s
- advertised_listeners (minikube-ip: node-port) для связи с приложениями за пределами kubernetes
1 - kafka-service.yml:
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
name: kafka
spec:
type: NodePort
ports:
- port: 9092
nodePort: 32767
targetPort: 9092
protocol: TCP
selector:
app: kafka
type: LoadBalancer
2 - kafka-cluster.yml:
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: kafka-b
spec:
replicas: 3
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: wurstmeister/kafka
ports:
- containerPort: 9092
env:
- name: HOSTNAME_COMMAND
value: "ifconfig |grep 'addr:172' |cut -d':' -f 2 |cut -d ' ' -f 1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk1:2181
- name: BROKER_ID_COMMAND
value: "ifconfig |grep 'inet addr:172' | cut -d'.' -f '4' | cut -d' ' -f '1'"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL://192.168.99.100:32767"
- name: KAFKA_LISTENERS
value: "INTERNAL://_{HOSTNAME_COMMAND}:9092"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: KAFKA_CREATE_TOPICS
value: mytopic:1:3