Я пытаюсь прочитать запрошенный номер сообщения kafka. Для нетранзакционных сообщений мы будем искать из endoffset - N для M разделов начинать опрос и собирать сообщения, где текущее смещение меньше конечного смещения для каждого раздела. Для идемпотентных / транзакционных сообщений мы должны учитывать маркеры транзакций / дубликаты сообщений, и значения смещений не будут непрерывными, в этом случае endoffset - N не будет возвращать N сообщений, и нам нужно будет вернуться и искать больше сообщений, пока у нас не будет N сообщенийдля каждого раздела или начальное смещение достигнуто
Поскольку существует несколько разделов, мне необходимо отслеживать все считанные смещения, чтобы я мог остановиться, когда все будет сделано. Существует два шага: первый шаг для вычисления начального смещения (конечное смещение - запрошенный номер сообщений) и конечное смещение. (смещения не являются непрерывными, есть пропуски), и я бы искал раздел для начала потребления с начального смещения. Второй шаг заключается в опросе сообщений и подсчете сообщений в каждом разделе, и если мы не встретим запрошенное количество сообщений, повторите первый и второй шаг снова, пока не встретим количество сообщений для каждого раздела.
Условия
Первоначальный опрос может не вернуть никаких записей, поэтому продолжайте опрос. Прекратите опрос, когда вы достигли конечного смещения для каждого раздела, или опрос не даст результатов. Проверьте каждый раздел на наличие сообщений, прочитанных так же, как и запрошенные сообщенияЕсли да, отметьте как выполненное, если нет, отметьте как продолжить и повторите шаги. Учитывайте пробелы в сообщениях. Должен работать как для транзакционного, так и для нетранзакционного производителя.
Вопрос:
Как мне отследить, чтобы все сообщения были прочитаны для каждого раздела и вышли из цикла? Сообщения в каждом разделе будут приходить по порядку, если это будет полезно.
Поддерживает ли Spring Kafka такой вариант использования? Более подробную информацию можно найти здесь
Обновление : я прошу прочитать последние N сообщений в каждом разделе. Разделы и нет сообщений является пользовательский ввод. Я хотел бы сохранить все управление смещением в памяти. По сути, мы пытаемся читать сообщения в порядке LIFO. Это усложняет задачу, поскольку Kafka позволяет читать вперед, а не назад.