У меня есть тема kafka с фактором репликации 3 и min.insync.replicas = 2
, продюсер, который отправляет X сообщений в эту тему с acks=all
.
Через некоторое время (в течение 1 минуты) после всех сообщений, отправленных в тему, для этой темы создается новый потребитель с помощью клиента java kafka. С помощью метода consumer.endOffsets()
извлекаются конечные смещения всех разделов kafka для этой темы. Другой вызов того же метода consumer.endOffsets
иногда возвращает разные конечные смещения для некоторых разделов.
В этой настройке нет новых сообщений, отправляемых в тему kafka после создания потребителя.
Согласно Java документам endOffsets
:
/**
* Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming
* message, i.e. the offset of the last available message + 1. If messages have never been written
* to the the partition, the offset returned will be 0.
*
* <p>
* This method does not change the current consumer position of the partitions.
* <p>
* When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO).
* This is the offset of the first message with an open transaction. The LSO moves forward as transactions
* are completed.
*
* @see #seekToEnd(Collection)
*
* @param partitions the partitions to get the end offsets.
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
endOffsets возвращает последнее стабильное смещение (LSO), которое подтверждается всеми репликами.
Почему иногда (не очень часто) конечные смещения изменяются при последующем вызове этого метода?
Ожидается ли поведение, что endOffsets в конечном итоге непротиворечивы? Ошибка?