Kafka LogSegment метод добавления путаницы - PullRequest
0 голосов
/ 04 мая 2020
/**
   * Append the given messages starting with the given offset. Add
   * an entry to the index if needed.
   *
   * It is assumed this method is being called from within a lock.
   *
   * @param largestOffset The last offset in the message set
   * @param largestTimestamp The largest timestamp in the message set.
   * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
   * @param records The log entries to append.
   * @return the physical position in the file of the appended records
   * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
   */
  @nonthreadsafe
  def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {//**Question1
    if (records.sizeInBytes > 0) {//ensure the message set to be written is not empty
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      val physicalPosition = log.sizeInBytes()//get the physical position of the .log file
      if (physicalPosition == 0)//if the .log is empty which indicates this LogSegement is empty

        rollingBasedTimestamp = Some(largestTimestamp)//update the timestamp

      ensureOffsetInRange(largestOffset)//ensure the largestOffset of the message set is in the valid,
// which means largestOffset - baseOffset in the range of \[0,Int.MAXVALUE\].

      // append the messages
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      if (largestTimestamp > maxTimestampSoFar) {//** Question2
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
      }
      // append an entry to the index (if needed)
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(largestOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        bytesSinceLastIndexEntry = 0
      }
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }

У меня две путаницы.

  1. Первый находится на месте Вопроса 1, в какой ситуации largestOffset будет отличаться от shallowOffsetOfMaxTimestamp, поскольку большая часть проверки проверяет largestOffset, но только место Вопроса 2 использует shallowOffsetOfMaxTimestamp.

  2. В том же месте Вопроса 2 находится if (largestTimestamp > maxTimestampSoFar), мне интересно, почему когда нужно это условие? Кроме того, я нашел здесь определение поля

 /* The maximum timestamp we see so far */
  @volatile private var _maxTimestampSoFar: Option[Long] = None
  def maxTimestampSoFar_=(timestamp: Long): Unit = _maxTimestampSoFar = Some(timestamp)
  def maxTimestampSoFar: Long = {
    if (_maxTimestampSoFar.isEmpty)
      _maxTimestampSoFar = Some(timeIndex.lastEntry.timestamp)
    _maxTimestampSoFar.get
  }

Поскольку официальная команда говорит, что этот метод append не является потокобезопасным, и его предполагается использовать в блокировке , Это вызывает еще большую путаницу, поскольку предполагается, что этот метод вызывается в блокировке, тогда, похоже, не может быть ситуации, когда offsetOfMaxTimestampSoFar может иметь другое значение в разное время или в другом потоке. Я полагаю, что добавление сообщений в LogSegment происходит в порядке времени, а затем, если набор сообщений должен добавляться к LogSegment ,, тогда как largestTimestamp <= <code>offsetOfMaxTimestampSoFar?

Может ли кто-нибудь привести пример ситуации, когда строки кода, приводящие меня в замешательство, имеют свою необходимость. Большое спасибо заранее. Еще одна вещь, которую нужно добавить, это то, что я читаю ветку Kafka из репозитория Kafka GitHub, которая должна отслеживать версию 2.5.

...