/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param largestOffset The last offset in the message set
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
* @return the physical position in the file of the appended records
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*/
@nonthreadsafe
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {//**Question1
if (records.sizeInBytes > 0) {//ensure the message set to be written is not empty
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()//get the physical position of the .log file
if (physicalPosition == 0)//if the .log is empty which indicates this LogSegement is empty
rollingBasedTimestamp = Some(largestTimestamp)//update the timestamp
ensureOffsetInRange(largestOffset)//ensure the largestOffset of the message set is in the valid,
// which means largestOffset - baseOffset in the range of \[0,Int.MAXVALUE\].
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {//** Question2
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
У меня две путаницы.
Первый находится на месте Вопроса 1, в какой ситуации largestOffset
будет отличаться от shallowOffsetOfMaxTimestamp
, поскольку большая часть проверки проверяет largestOffset
, но только место Вопроса 2 использует shallowOffsetOfMaxTimestamp
.
В том же месте Вопроса 2 находится if (largestTimestamp > maxTimestampSoFar)
, мне интересно, почему когда нужно это условие? Кроме того, я нашел здесь определение поля
/* The maximum timestamp we see so far */
@volatile private var _maxTimestampSoFar: Option[Long] = None
def maxTimestampSoFar_=(timestamp: Long): Unit = _maxTimestampSoFar = Some(timestamp)
def maxTimestampSoFar: Long = {
if (_maxTimestampSoFar.isEmpty)
_maxTimestampSoFar = Some(timeIndex.lastEntry.timestamp)
_maxTimestampSoFar.get
}
Поскольку официальная команда говорит, что этот метод append
не является потокобезопасным, и его предполагается использовать в блокировке , Это вызывает еще большую путаницу, поскольку предполагается, что этот метод вызывается в блокировке, тогда, похоже, не может быть ситуации, когда offsetOfMaxTimestampSoFar
может иметь другое значение в разное время или в другом потоке. Я полагаю, что добавление сообщений в LogSegment происходит в порядке времени, а затем, если набор сообщений должен добавляться к LogSegment
,, тогда как largestTimestamp
<= <code>offsetOfMaxTimestampSoFar?
Может ли кто-нибудь привести пример ситуации, когда строки кода, приводящие меня в замешательство, имеют свою необходимость. Большое спасибо заранее. Еще одна вещь, которую нужно добавить, это то, что я читаю ветку Kafka из репозитория Kafka GitHub, которая должна отслеживать версию 2.5.