Как реализовать потребителя точно один раз без назначения разделов вручную - PullRequest
1 голос
/ 20 сентября 2019

Я просматривал эту статью , которая объясняет, как обеспечить обработку сообщения ровно один раз, выполнив следующие действия:

  • Чтение (тема, раздел, смещение) из базы данных при запуске/ restart
  • Чтение сообщения из определенного (тема, раздел, смещение)
  • Атомно делать следующие вещи:
    • Обработка сообщения
    • Фиксировать смещение в базе данных как (тема, раздел, смещение)

Как видите, в нем явно указано, с какого раздела читать сообщения.Я считаю, что это не очень хорошая идея, поскольку она не позволяет Kafka назначать справедливую долю раздела активным потребителям.Я не могу прийти с логикой для реализации аналогичной функциональности без явного указания разделов при опросе темы kafka внутри потребителя.Можно ли это сделать?

1 Ответ

1 голос
/ 20 сентября 2019

Хороший анализ.У вас есть очень хорошее замечание, и, если возможно, вы должны, конечно, позволить kafka обработать назначение раздела для потребителей.

Существует альтернатива customer.Assign (Partition []).Брокеры kafka уведомят ваших потребителей, когда раздел будет отозван или назначен потребителю.Например, клиентская библиотека dotnet имеет обработчики SetPartitionsRevoked и SetPartitionsAssigned, которые потребители могут использовать для управления своими смещениями.

При отмене раздела сохраняйте свое последнее обработанное смещение для каждого отзываемого раздела в базе данных.Когда назначается новый раздел, получите последнее обработанное смещение для этого раздела из базы данных и используйте его.

C # Пример:

public class Program
{
   public void Main(string[] args)
   {

      using (
         var consumer = new ConsumerBuilder<string, string>(config)
                      .SetErrorHandler(ErrorHandler)
                      .SetPartitionsRevokedHandler(HandlePartitionsRevoked)
                      .SetPartitionsAssigned(HandlePartitionsAssigned)
                      .Build()
      )
      {
         while (true)
         {
            consumer.Consume()//.Poll()
         }
      }
   }

   public IEnumerable<TopicPartitionOffset> 
   HandlePartitionsRevoked
   (
      IConsumer<string, string> consumer, 
      List<TopicPartitionOffset> currentTopicPartitionOffsets
   )
   {
      Persist(<last processed offset for each partition in 
      'currentTopicPartitionOffsets'>);
      return tpos;
   }

   public IEnumerable<TopicPartitionOffset> HandlePartitionsAssigned
   (
      IConsumer<string, string> consumer, 
      List<TopicPartition> tps
    )
   {
      List<TopicPartitionOffset> tpos = FetchOffsetsFromDbForTopicPartitions(tps);
      return tpos
   }
}

JavaПример из документов ConsumerRebalanceListener :

Если вы пишете на Java, существует интерфейс ConsumerRebalanceListener, который вы можете реализовать.Затем вы передаете реализацию интерфейса в метод consumer.Subscribe (topic, listener).Приведенный ниже пример дословно взят из документов Кафки, связанных выше:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
       private Consumer<?,?> consumer;

       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
           this.consumer = consumer;
       }

       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           // save the offsets in an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              saveOffsetInExternalStore(consumer.position(partition));
       }

       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
           // read the offsets from an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              consumer.seek(partition, readOffsetFromExternalStore(partition));
    }
}

Если мое понимание верно, вы бы назвали версию Java следующим образом: consumer.Subscribe("My topic", new SaveOffsetsOnRebalance(consumer)).

Для получения дополнительной информации см. Раздел «Хранение смещений вне Kafka» в kafka docs .

Вот выдержка из этих документов, в которой кратко излагается, как хранить разделы исмещения для точной однократной обработки:

Каждая запись имеет собственное смещение, поэтому для управления собственным смещением вам нужно просто сделать следующее:

  • Настроить разрешение.auto.commit = false
  • Используйте смещение, предоставляемое каждым ConsumerRecord, чтобы сохранить вашу позицию.
  • При перезапуске восстановить позицию потребителя с помощью поиска (TopicPartition, long).

Этот тип использования наиболее прост, когда назначение раздела также выполняется вручную (это может бытьв случае использования поискового индекса, описанного выше).Если назначение разделов выполняется автоматически, требуется особая осторожность в случае изменения назначений разделов.Это можно сделать, предоставив экземпляр ConsumerRebalanceListener в вызове для подписки (Collection, ConsumerRebalanceListener) и подписки (Pattern, ConsumerRebalanceListener).Например, когда разделы взяты у потребителя, потребитель захочет зафиксировать свое смещение для этих разделов, реализовав ConsumerRebalanceListener.onPartitionsRevoked (Collection).Когда разделы назначены потребителю, потребитель захочет найти смещение для этих новых разделов и правильно инициализировать потребителя на эту позицию, реализовав ConsumerRebalanceListener.onPartitionsAssigned (Collection).

Еще одним распространенным использованием ConsumerRebalanceListener являетсяочистить любые кэши, которые поддерживает приложение для разделов, которые перемещены в другое место.

...