Вопрос реабаланса Кафки - PullRequest
1 голос
/ 31 января 2020

Я сталкиваюсь с некоторыми проблемами с Kafka, когда мы получаем данные о каждом продукте в виде XML из одного приложения через Kafka, а затем мы используем потоковую обработку искры, обрабатываем XML и вставляем данные в улей. Для обработки каждого продукта XML требуется около 30-40 секунд, поэтому мы установили опрос потребителей на 60 секунд, чтобы он мог опрашивать следующие записи через 60 секунд. Нам нужно будет обработать 90 000 записей о продуктах XML, и это действие будет выполняться только один раз, после чего все продукты, которые будут обновлены в моем приложении, будут только go через Kafka и попадать в озеро данных.

Поскольку в topi c имеется 14 разделов, потребитель считывает записи из разделов и выполняет работу, но если мы добавим еще один экземпляр / потребитель в группу потребителей, он начнет давать сбой и выдаст следующую ошибку.

org. apache .kafka.clients.consumer.CommitFailedException: Фиксация не может быть завершена, так как группа уже перебалансировала и присвоила разделы другому члену. Это означает, что время между последующими вызовами poll () было больше, чем настроенный session.timeout.ms, что обычно означает, что опрос l oop тратит слишком много времени на обработку сообщений. Вы можете решить эту проблему, увеличив тайм-аут сеанса или уменьшив максимальный размер пакетов, возвращаемых в poll () с max.poll.records.

Что я понял из этой ошибки, так это то, что если она занимает много время для обработки записей, и если мы не будем долго опрашивать, тогда координатор группы может предположить, что потребитель мёртв, и активировать процесс восстановления баланса. В таких случаях происходит перебалансировка из-за того, что мы некоторое время не опрашивали, или что-то еще пошло не так, и текущий раздел забирает, а затем назначает другому потребителю, поэтому в таких случаях мы хотим зафиксировать все, что уже обработали, перед владением разделом. удаляется, и новый владелец раздела должен начать использовать его из последнего зафиксированного смещения.

Чтобы зафиксировать конкретное смещение, мы можем продолжать фиксировать промежуточное смещение вместо того, чтобы фиксировать текущее смещение в конце, и чтобы узнать, когда происходит перебалансирование, чтобы мы могли зафиксировать те, которые мы уже обработали, мы добавили слушатель ребалансировки API для потребителя, который сначала отзывает раздел, фиксирует смещения, а затем назначает его другому разделу.

API слушателя перебаланса Кафки позволяет нам указать класс слушателя перебалансировки потребителя, который имеет два метода onPartitionsRevoked и onPartitionsAssigned. Метод onPartitionsRevoked непосредственно перед тем, как убрать раздел, чтобы мы могли зафиксировать наше текущее смещение. Метод onPartitionsAssigned сразу после завершения прослушивания перебалансировки и до того, как он начнет потреблять записи из новых разделов.

Когда go -Live, нам нужно работать с 14 потребителями для параллельной обработки 90k записей, поэтому мы добавили класс слушателя перебалансировки чтобы достичь этого (код прилагается). Используя API прослушивателя ребаланса, когда мы добавляем другого потребителя, тогда будет произведено ребалансирование, потому что у нас новый потребитель, и у нас будут новые разделы для чтения, и тогда Kafka отзовет все разделы от первого потребителя, потому что список потребителей в группе изменен, и оба потребителя будут получить новые разделы, тогда каждый из них получит одинаковое количество различных разделов.

После этого та же проблема все еще сохраняется. Не могли бы вы, ребята, помочь нам решить эту проблему.

Значения конфигурации потребителя:

metri c .reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org. apache .kafka.clients.consumer.RangeAssignor] reinnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap .servers = [d.messagebroker.genmills.com:9092, d.messagebroker.genmills.com:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl. Механизм = GSSAPI interceptor.classes = нуль request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org. apache .kafka.common.serialization.StringDeserializer group.id = PIM-Consumer retry.backoff.ms = 100 ssl.secure .random.implementation = null sasl.kerberos.kinit.cmd = / usr / bin / kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0,05 ssl.trustmanager.algorithm = PKIX ssl.key .password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 ключ .deserializer = class org. apache .k afka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = нулевая безопасность. протокол = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = последний

мой код:

      while (true) {
        val record = consumer.poll(60000)
        if (record != null) {
          println("record count" + record.count())
          var scalaRecords = record.asScala.toList
          if (scalaRecords.nonEmpty) {
            scalaRecords.foreach(record => {
              var messageValue = (record.value).toString
              println("start timestamp - > " + LocalDateTime.now().toString)
              try {
                if (messageValue.length >= 2 && (messageValue.charAt(0) == '"') && (messageValue.charAt(messageValue.length - 1) == '"')) {
                  var message = messageValue.substring(1, messageValue.length - 1);
                  xmltags_check(message)
                }
                else {
                  xmltags_check(messageValue)
                }
                rebalanceListner.addoffset(record.topic(), record.partition(), record.offset());
                consumer.commitSync(rebalanceListner.getCurrentoffsets());
              }
              catch {
                case e: Exception =>
                  logError(s"Failed to process Message{$messageValue} with error details{$e}")
              }
              println("end timestamp - > " + LocalDateTime.now().toString)
            }
            )
          }

        } else {
          logInfo(s"running step to data lake- in else no record found")
        }

      }
    } catch {
      case e: Exception =>
        logError(s"Failed to poll records from consumer{$e}")
    }

rebalancelistner :-


import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class RebalanceListner implements ConsumerRebalanceListener {
    private KafkaConsumer consumer;
    private Map<TopicPartition, OffsetAndMetadata> currentoffsets = new HashMap();

    public RebalanceListner(KafkaConsumer con){
        this.consumer=con;
    }
    public void addoffset(String topic, int partition, long offset ){
        currentoffsets.put(new TopicPartition(topic, partition),new OffsetAndMetadata(offset,"commit"));
    }
    public Map<TopicPartition, OffsetAndMetadata> getCurrentoffsets(){
        return currentoffsets;
    }
    public void onPartitionsAssigned(Collection<TopicPartition> partitions){
        System.out.println("Following Partition Assigned ..");
        for(TopicPartition  partition: partitions)
            System.out.println(partition.partition()+",");
    }
    public void onPartitionsRevoked(Collection<TopicPartition> partitions){
        System.out.println("Following Partition Revoked ..");
        for(TopicPartition  partition: partitions)
            System.out.println(partition.partition()+",");
        System.out.println("Following Partition Commited ..");
        for(TopicPartition tp: currentoffsets.keySet())
            System.out.println(tp.partition());
        consumer.commitSync(currentoffsets);
        currentoffsets.clear();
    }
}
...