Хороший анализ.У вас есть очень хорошее замечание, и, если возможно, вы должны, конечно, позволить kafka обработать назначение раздела для потребителей.
Существует альтернатива customer.Assign (Partition []).Брокеры kafka уведомят ваших потребителей, когда раздел будет отозван или назначен потребителю.Например, клиентская библиотека dotnet имеет обработчики SetPartitionsRevoked и SetPartitionsAssigned, которые потребители могут использовать для управления своими смещениями.
При отмене раздела сохраняйте свое последнее обработанное смещение для каждого отзываемого раздела в базе данных.Когда назначается новый раздел, получите последнее обработанное смещение для этого раздела из базы данных и используйте его.
C # Пример:
public class Program
{
public void Main(string[] args)
{
using (
var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler(ErrorHandler)
.SetPartitionsRevokedHandler(HandlePartitionsRevoked)
.SetPartitionsAssigned(HandlePartitionsAssigned)
.Build()
)
{
while (true)
{
consumer.Consume()//.Poll()
}
}
}
public IEnumerable<TopicPartitionOffset>
HandlePartitionsRevoked
(
IConsumer<string, string> consumer,
List<TopicPartitionOffset> currentTopicPartitionOffsets
)
{
Persist(<last processed offset for each partition in
'currentTopicPartitionOffsets'>);
return tpos;
}
public IEnumerable<TopicPartitionOffset> HandlePartitionsAssigned
(
IConsumer<string, string> consumer,
List<TopicPartition> tps
)
{
List<TopicPartitionOffset> tpos = FetchOffsetsFromDbForTopicPartitions(tps);
return tpos
}
}
JavaПример из документов ConsumerRebalanceListener :
Если вы пишете на Java, существует интерфейс ConsumerRebalanceListener, который вы можете реализовать.Затем вы передаете реализацию интерфейса в метод consumer.Subscribe (topic, listener).Приведенный ниже пример дословно взят из документов Кафки, связанных выше:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
private Consumer<?,?> consumer;
public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// save the offsets in an external store using some custom code not described here
for(TopicPartition partition: partitions)
saveOffsetInExternalStore(consumer.position(partition));
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// read the offsets from an external store using some custom code not described here
for(TopicPartition partition: partitions)
consumer.seek(partition, readOffsetFromExternalStore(partition));
}
}
Если мое понимание верно, вы бы назвали версию Java следующим образом: consumer.Subscribe("My topic", new SaveOffsetsOnRebalance(consumer))
.
Для получения дополнительной информации см. Раздел «Хранение смещений вне Kafka» в kafka docs .
Вот выдержка из этих документов, в которой кратко излагается, как хранить разделы исмещения для точной однократной обработки:
Каждая запись имеет собственное смещение, поэтому для управления собственным смещением вам нужно просто сделать следующее:
- Настроить разрешение.auto.commit = false
- Используйте смещение, предоставляемое каждым ConsumerRecord, чтобы сохранить вашу позицию.
- При перезапуске восстановить позицию потребителя с помощью поиска (TopicPartition, long).
Этот тип использования наиболее прост, когда назначение раздела также выполняется вручную (это может бытьв случае использования поискового индекса, описанного выше).Если назначение разделов выполняется автоматически, требуется особая осторожность в случае изменения назначений разделов.Это можно сделать, предоставив экземпляр ConsumerRebalanceListener в вызове для подписки (Collection, ConsumerRebalanceListener) и подписки (Pattern, ConsumerRebalanceListener).Например, когда разделы взяты у потребителя, потребитель захочет зафиксировать свое смещение для этих разделов, реализовав ConsumerRebalanceListener.onPartitionsRevoked (Collection).Когда разделы назначены потребителю, потребитель захочет найти смещение для этих новых разделов и правильно инициализировать потребителя на эту позицию, реализовав ConsumerRebalanceListener.onPartitionsAssigned (Collection).
Еще одним распространенным использованием ConsumerRebalanceListener являетсяочистить любые кэши, которые поддерживает приложение для разделов, которые перемещены в другое место.