Kafka - Потеря сообщений, даже если приложение настроено ровно один раз и обеспечивает максимальную надежность - PullRequest
1 голос
/ 27 февраля 2020

Бывают случаи (очень редко, но есть), когда я получаю дубликаты, даже если все настроено на высокую стойкость и мы используем конфигурацию ровно один раз.

Пожалуйста, проверьте ниже контекст приложения и тестовый сценарий, что вызывает эту проблему.

Настройка кластера Kafka

3 брокера Kafka (1 на хосте 1, 2 на хосте 2 и 3 на хосте 3)

3 x экземпляра Zookeeper (1 на хосте 1 , 2 на хосте 2 и 3 на хосте 3)

Конфигурация Kafka


    broker.id=1,2,3

    num.network.threads=2

    num.io.threads=8

    socket.send.buffer.bytes=102400

    socket.receive.buffer.bytes=102400

    socket.request.max.bytes=104857600

    log.dirs=/home/kafka/logs/kafka

    min.insync.replicas=3

    transaction.state.log.min.isr=3

    default.replication.factor=3

    log.retention.minutes=600

    log.segment.bytes=1073741824

    log.retention.check.interval.ms=300000

    zookeeper.connect=host1:2181,host2:2181,host3:2181

    zookeeper.connection.timeout.ms=6000

    group.initial.rebalance.delay.ms=1000

    log.message.timestamp.type=LogAppendTime

    delete.topic.enable=true

    auto.create.topics.enable=false

    unclean.leader.election.enable=false

Конфигурация ZooKeeper


    tickTime=2000

    dataDir=/home/kafka/logs/zk

    clientPort=2181

    maxClientCnxns=0

    initLimit=5

    syncLimit=2

    server.1=host1:2888:3888

    server.2=host2:2888:3888

    server.3=host3:2888:3888

    autopurge.snapRetainCount=3

    autopurge.purgeInterval=24

Описание внутренних тем Kafka

Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
      Topic: __transaction_state     Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
​
Topic:__consumer_offsets       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
      Topic: __consumer_offsets       Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3

Темы приложений


    Topic input-event
    Topic:input-event     PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
          Topic: input-event     Partition: 0   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
          Topic: input-event     Partition: 1   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
          Topic: input-event     Partition: 2   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3

    Topic output-event
    Topic:output-event       PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
          Topic: output-event       Partition: 0   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
          Topic: output-event       Partition: 1   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
          Topic: output-event       Partition: 2   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

Свойства потребителя приложения


    o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
                  auto.commit.interval.ms = 5000
                  auto.offset.reset = earliest
                  bootstrap.servers = [host1:9092, host2:9092, host3:9092]
                  check.crcs = true
                  client.id =
                  connections.max.idle.ms = 540000
                  default.api.timeout.ms = 60000
                  enable.auto.commit = false
                  exclude.internal.topics = true
                  fetch.max.bytes = 134217728
                  fetch.max.wait.ms = 500
                  fetch.min.bytes = 1
                  group.id = groupId
                  heartbeat.interval.ms = 3000
                  interceptor.classes = []
                  internal.leave.group.on.close = true
                  isolation.level = read_committed
                  key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
                  max.partition.fetch.bytes = 134217728
                  max.poll.interval.ms = 300000
                  max.poll.records = 1
                  metadata.max.age.ms = 300000
                  metric.reporters = []
                  metrics.num.samples = 2
                  metrics.recording.level = INFO
                  metrics.sample.window.ms = 30000
                  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
                  receive.buffer.bytes = 65536
                  reconnect.backoff.max.ms = 1000
                  reconnect.backoff.ms = 1000
                  request.timeout.ms = 30000
                  retry.backoff.ms = 1000
                  sasl.client.callback.handler.class = null
                  sasl.jaas.config = null
                  sasl.kerberos.kinit.cmd = /usr/bin/kinit
                  sasl.kerberos.min.time.before.relogin = 60000
                  sasl.kerberos.service.name = null
                  sasl.kerberos.ticket.renew.jitter = 0.05
                  sasl.kerberos.ticket.renew.window.factor = 0.8
                  sasl.login.callback.handler.class = null
                  sasl.login.class = null
                  sasl.login.refresh.buffer.seconds = 300
                  sasl.login.refresh.min.period.seconds = 60
                  sasl.login.refresh.window.factor = 0.8
                  sasl.login.refresh.window.jitter = 0.05
                  sasl.mechanism = GSSAPI
                  security.protocol = PLAINTEXT
                  send.buffer.bytes = 131072
                  session.timeout.ms = 10000
                  ssl.cipher.suites = null
                  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                  ssl.endpoint.identification.algorithm = https
                  ssl.key.password = null
                  ssl.keymanager.algorithm = SunX509
                  ssl.keystore.location = null
                  ssl.keystore.password = null
                  ssl.keystore.type = JKS
                  ssl.protocol = TLS
                  ssl.provider = null
                  ssl.secure.random.implementation = null
                  ssl.trustmanager.algorithm = PKIX
                  ssl.truststore.location = null
                  ssl.truststore.password = null
                  ssl.truststore.type = JKS
                  value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Свойства производителя приложения

    bootstrapServers = "host1, host2, host3"
    transactionIdPrefix = "my-producer-"${instance}"
    "enable.idempotence" = "true"
    "acks" = "all"
    "retries" = "2147483647"
    "transaction.timeout.ms" = "10000"
    "max.in.flight.requests.per.connection" = "1"
    "reconnect.backoff.max.ms" = "1000"
    "reconnect.backoff.ms" = "1000"
    "retry.backoff.ms" = "1000"

Обработка приложения фиксирует

Используя KafkaTransactionManager, мы запускаем транзакцию , напишите сообщение для вывода topi c, используя KafkaTemplate, а также отправьте смещения потребителя (spring-kafka 2.2.8.RELEASE).

Ожидаемый / фактический тест

  • Введите 32 000 сообщений для ввода topi c

  • Запуск 3 экземпляров приложения

  • Запуск обработки сообщений одно за другим (max.poll.records = 1)

  • Во время обработки, отправьте SIGKILL (kill -9) параллельно с брокерами Кафки host1 и host2 50 раз.

  • Подождите 60 секунд

  • Во время обработки, отправляйте SIGKILL (kill -9) параллельно с брокерами Кафки host1 и host3 50 раз.

  • Подождите 60 секунд

  • Во время обработки, отправьте SIGKILL (kill -9) параллельно на host2 и host3 Kafka Brokers 50 раз.

Ожидание будет иметь должно было иметь 32 000 сообщений на выходе topi c, однако, иногда мы фактически получаем дубликат (по крайней мере, один).

Бывают случаи, когда мы получаем 32 000 сообщений, и все правильно.

1 Ответ

0 голосов
/ 02 марта 2020

Проблема была связана с тем, что идентификатор транзакции был неправильно установлен на уровне topi c .partition, и у нас было два производителя, которые дважды писали одно и то же сообщение для одного и того же раздела.

Это было хорошее чтение: https://tgrez.github.io/posts/2019-04-13-kafka-transactions.html

...