Сводка
Документы и комментарии к кодам для Kafka предполагают, что, когда для параметра acks
производителя задано значение all
, подтверждение будет отправлено производителю только в том случае, если все реплики в синхронизации синхронизируются , но код (Partition.Scala
, checkEnoughReplicasReachOffset
), похоже, предполагает, что подтверждение отправляется, как только реплики min синхронизируются .
Подробности
Документы kafka имеют следующее:
acks = all Это означает, что лидер будет ждать полного наборасинхронные реплики для подтверждения записи. source
Также, глядя на исходный код Kafka - partition.scala
checkEnoughReplicasReachOffset()
имеет следующий комментарий (выделено мной):
Примечаниечто этот метод будет вызываться только в том случае, если requiredAcks = -1, и мы ожидаем, что все реплики в ISR будут полностью обработаны до (локального) смещения лидера, соответствующего этому запросу на производство, прежде чем мы подтвердим запрос на производство..
Наконец, этот ответ о переполнении стека (снова выделение мое)
Кроме того, минимальная синхронная реплика задает минимальное числореплики, которые должны быть синхронизированы, чтобы раздел оставался доступным для записи.Когда производитель указывает ack (-1 / all config), он все еще будет ожидать подтверждения от все в синхронизирующих репликах в этот момент (независимо от настройки минимальных синхронизирующих реплик).
Но когда я смотрю на код в Partition.Scala (примечание minIsr < curInSyncReplicas.size
):
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
...
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
Код, который вызывает это, возвращает ack:
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
Таккод выглядит так, как будто он возвращает подтверждение, как только набор реплик в синхронизации превышает минимальные в синхронизации реплики.Тем не менее, документация и комментарии предполагают, что подтверждение отправляется только после того, как все синхронизированные реплики были обнаружены.Что мне не хватает?По крайней мере, комментарий выше checkEnoughReplicasReachOffset
выглядит так, как будто его нужно изменить.