С учетом следующего кода
case class Score(value: BigInt, random: Long = randomLong) extends Comparable[Score] {
override def compareTo(that: Score): Int = {
if (this.value < that.value) -1
else if (this.value > that.value) 1
else if (this.random < that.random) -1
else if (this.random > that.random) 1
else 0
}
override def equals(obj: _root_.scala.Any): Boolean = {
val that = obj.asInstanceOf[Score]
this.value == that.value && this.random == that.random
}
}
@tailrec
private def update(mode: UpdateMode, member: String, newScore: Score, spinCount: Int, spinStart: Long): Unit = {
// Caution: there is some subtle logic below, so don't modify it unless you grok it
try {
Metrics.checkSpinCount(member, spinCount)
} catch {
case cause: ConcurrentModificationException =>
throw new ConcurrentModificationException(Leaderboard.maximumSpinCountExceeded.format("update", member), cause)
}
// Set the spin-lock
put(member, None) match {
case None =>
// BEGIN CRITICAL SECTION
// Member's first time on the board
if (scoreToMember.put(newScore, member) != null) {
val message = s"$member: added new member in memberToScore, but found old member in scoreToMember"
logger.error(message)
throw new ConcurrentModificationException(message)
}
memberToScore.put(member, Some(newScore)) // remove the spin-lock
// END CRITICAL SECTION
case Some(option) => option match {
case None => // Update in progress, so spin until complete
//logger.debug(s"update: $member locked, spinCount = $spinCount")
for (i <- -1 to spinCount * 2) {Thread.`yield`()} // dampen contention
update(mode, member, newScore, spinCount + 1, spinStart)
case Some(oldScore) =>
// BEGIN CRITICAL SECTION
// Member already on the leaderboard
if (scoreToMember.remove(oldScore) == null) {
val message = s"$member: oldScore not found in scoreToMember, concurrency defect"
logger.error(message)
throw new ConcurrentModificationException(message)
} else {
val score =
mode match {
case Replace =>
//logger.debug(s"$member: newScore = $newScore")
newScore
case Increment =>
//logger.debug(s"$member: newScore = $newScore, oldScore = $oldScore")
Score(newScore.value + oldScore.value)
}
//logger.debug(s"$member: updated score = $score")
scoreToMember.put(score, member)
memberToScore.put(member, Some(score)) // remove the spin-lock
//logger.debug(s"update: $member unlocked")
}
// END CRITICAL SECTION
// Do this outside the critical section to reduce time under lock
if (spinCount > 0) Metrics.checkSpinTime(System.nanoTime() - spinStart)
}
}
}
Существует две важные структуры данных: memberToScore и scoreToMember. Я экспериментировал с использованием TrieMap[String,Option[Score]]
и ConcurrentHashMap[String,Option[Score]]
для memberToScore, и у обоих одинаковое поведение.
Пока мое тестирование показывает, что код верен и безопасен для потоков, но загадкой является производительность спин-блокировки. В системе с 12 аппаратными потоками и 1000 итераций по 12 фьючерсам: попадание в один и тот же элемент все время приводит к циклам вращения 50 или более, но попадание в случайное распределение членов может привести к циклам вращения 100 или более. Поведение ухудшается, если я не ослабляю вращение, не повторяя вызовы yield ().
Итак, это кажется нелогичным, я ожидал, что случайное распределение ключей приведет к меньшему вращению, чем тот же ключ, но тестирование доказывает обратное.
Может кто-нибудь предложить некоторое представление об этом нелогичном поведении?
Конечно, могут быть лучшие решения для моего дизайна, и я открыт для них, но пока я не могу найти удовлетворительного объяснения тому, что показывают мои тесты, и мое любопытство оставляет меня голодным.
Кроме того, в то время как в тесте с одним элементом нижний предел для счетчика вращений, в тесте с произвольным элементом - более низкий потолок для времени вращения, чего я и ожидал. Я просто не могу объяснить, почему тест случайного члена обычно дает более высокий потолок для подсчета вращения.