Я управляю кластером 4 брокеров Kafka в Kubernetes.Коэффициент репликации равен 3, а ISR равен 2.
Кроме того, есть служба производителя (работающая в Spring Stream), генерирующая сообщения, и служба чтения потребителя из этой темы.Теперь я попытался обновить кластер Kafka с помощью непрерывного обновления, надеясь на отсутствие простоев, но во время обновления журнал производителя был заполнен этой ошибкой:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Согласно моим расчетам, когда 1 брокервниз не должно быть проблем, потому что минимальный ISR равен 2. Однако, похоже, что служба производителя не знает о текущем обновлении и продолжает отправлять сообщения одному и тому же брокеру ...
Есть идеи, какрешить это?
Это мой kafka.yaml
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: kafka
namespace: default
labels:
app: kafka
spec:
serviceName: kafka
replicas: 4
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
app: kafka
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9308"
spec:
nodeSelector:
middleware.node: "true"
imagePullSecrets:
- name: nexus-registry
terminationGracePeriodSeconds: 300
containers:
- name: kafka
image: kafka:2.12-2.1.0
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 3000m
memory: 1800Mi
requests:
cpu: 2000m
memory: 1800Mi
env:
# Replication
- name: KAFKA_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_MIN_INSYNC_REPLICAS
value: "2"
# Protocol Version
- name: KAFKA_INTER_BROKER_PROTOCOL_VERSION
value: "2.1"
- name: KAFKA_LOG_MESSAGE_FORMAT_VERSION
value: "2.1"
- name: ENABLE_AUTO_EXTEND
value: "true"
- name: KAFKA_DELETE_TOPIC_ENABLE
value: "true"
- name: KAFKA_RESERVED_BROKER_MAX_ID
value: "999999999"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR
value: "10"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_LOG_RETENTION_BYTES
value: "1800000000000"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_OFFSETS_RETENTION_MINUTES
value: "10080"
- name: KAFKA_ZOOKEEPER_CONNECT
valueFrom:
configMapKeyRef:
name: zk-config
key: zk.endpoints
- name: KAFKA_LOG_DIRS
value: /kafka/kafka-logs
ports:
- name: kafka
containerPort: 9092
- name: prometheus
containerPort: 7071
volumeMounts:
- name: data
mountPath: /kafka
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
failureThreshold: 12
initialDelaySeconds: 10
periodSeconds: 30
successThreshold: 1
- name: kafka-exporter
image: danielqsj/kafka-exporter:latest
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 500m
memory: 500Mi
ports:
- containerPort: 9308
volumeClaimTemplates:
- metadata:
name: data
labels:
app: kafka
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2000Gi