Настройка одинакового смещения для потребителей Kafka из разных групп - PullRequest
1 голос
/ 27 июня 2019

У меня есть ServiceA, который создает DomainChangeEvents и фиксирует их в теме в kafka, затем ServiceB использует эти события из темы kafka и применяет изменения к модели чтения, хранящейся в памяти. Некоторые из DomainChangeEvent - это события сброса, а те сбрасывают домен в начальную точку. При перезапуске ServiceB я хочу прочитать ChangeEvents из последнего сброса и перестроить домен впоследствии.

ServiceB запускается в докере как реплицируемая служба.

Поскольку мне нужны все ChangeEvents в каждой реплике ServiceB, я не могу дать им один и тот же group.id, иначе сообщения будут сбалансированы, и я не получу все события во всех репликах. Как настроить ServiceB для продолжения с последнего события сброса после перезапуска?

Я попытался установить случайный group.id на ServiceB и зафиксировать сброс сообщения после его использования, но после перезапуска у меня есть другой group.id, поэтому все сообщения потребляются с самого начала снова.

Мысль о предоставлении другой конфигурации репликам Docker, но, как я читаю, служба Docker настроена одинаково во всех репликах, и это не вариант.

1 Ответ

1 голос
/ 27 июня 2019

Возможным решением было бы сохранить те точки, с которых вы хотите, чтобы ваши разные потребители начинали, с помощью фиксации вручную, например, базы данных.

Таблица, которая будет выглядеть так:

Topic  Partition  Offset

topicA 0          112
topicA 1          125
topicB 0          2313
topicB 1          2984
topicB 2          2554

Это будут ваши "последние точки сброса" или позиции, с которых ваши потребители хотят начать. Проблема с subscribe() методом, как вы правильно сказали, заключается в том, что он зависит от параметра group.id и играет в игру по перебалансировке и координации потребителя.

Чтобы получить из фиксированной точки (или набора точек в разных разделах), вы должны вместо этого позвонить на assign(). С помощью этого метода вы сможете вручную указать список разделов для своих потребителей . Нет group.id, нет динамического назначения разделов и смещения загрузки, а это то, что вам нужно.

После назначения разделов вы должны позвонить на seek(). При поиске вы сообщаете потребителю, с какого смещения вы хотите начать чтение с раздела, указанного в методе assign().

Например, чтобы начать чтение с «последних перезагрузок» из любой темы, вы должны сделать что-то вроде:

//seeking the last offset of topicA's partition0
public void setStartPosition(TopicPartition partition, long offset) 
{
     consumer.assign(Collections.singletonList(partition)); //f.e-> partition0
     consumer.seek(partition, offset);                      //f.e -> 112
}

Вызов этого метода установит вашего потребителя точно в нужной позиции в каждом разделе. Я не совсем уверен, отвечаю ли я на вашу проблему, но надеюсь, что это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...