Я использую etaty redisscala (https://github.com/etaty/rediscala) клиент. Вот моя функция
private def getVersionTime(db: RedisClient, interval: Long)(implicit ec: ExecutionContext): Future[Long] = {
import akka.util.ByteString
import redis.ByteStringFormatter
implicit val byteStringLongFormatter = new ByteStringFormatter[Long] {
def serialize(data: Long): ByteString = ByteString(data.toString.getBytes)
def deserialize(bs: ByteString): Long = bs.utf8String.toLong
}
db.get[Long]("versionTime").map {
case Some(v) => loggerF.info(s"Retrieved version time ${v}")
v
case None => val current = System.currentTimeMillis()
db.setex[Long]("versionTime", (current / 1000) + interval, current)
loggerF.info(s"set version time ${current}")
current
}
}
Вот мой тест. Этот тест вызывает метод выше
it("check with multiple tasks"){
val target = 10
val latch = new java.util.concurrent.CountDownLatch(target)
(1 to target).map{t =>
getVersionTime(prodDb, 10).map{r => print("\n" + r); latch.countDown()}
}
assert(latch.await(10, TimeUnit.SECONDS))
}
Вывод теста
14: 52: 46.692 [pool-1-thread-12] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.693 [pool-1-thread-6] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.693 [pool-1-thread-20] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.692 [pool-1-thread-2] INFO EndToEndITTests - установить время версии 1548062566686
14: 52: 46.692 [pool-1-thread-10] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.693 [pool-1-thread-8] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.692 [pool-1-thread-4] INFO EndToEndITTests - установить время версии 1548062566686
14: 52: 46.692 [pool-1-thread-11] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.692 [pool-1-thread-9] INFO EndToEndITTests - установить время версии 1548062566687
14: 52: 46.692 [pool-1-thread-7] INFO EndToEndITTests - установить время версии 1548062566687
Ожидаемое поведение: установленное время версии должно появиться один раз, а для остальных потоков Время распечатанной версии должно быть напечатано. Я думаю, что мне нужно использовать транзакцию здесь, чтобы получить и setex, завернутый в часы и exec
private def getVersionTimeTrans(db: RedisClient, interval: Long): Long = {
import akka.util.ByteString
import redis.ByteStringFormatter
implicit val byteStringLongFormatter = new ByteStringFormatter[Long] {
def serialize(data: Long): ByteString = ByteString(data.toString.getBytes)
def deserialize(bs: ByteString): Long = bs.utf8String.toLong
}
val redisTransaction = db.transaction()
redisTransaction.watch("versionTime")
val result: Future[Long] = redisTransaction.get[Long]("versionTime").map {
case Some(v) => loggerF.info(s"Retrieved version time ${v}")
v
case None => val current = System.currentTimeMillis()
redisTransaction.setex[Long]("versionTime", (current / 1000) + interval, current)
loggerF.info(s"set version time ${current}")
current
}
redisTransaction.exec()
val r = for {
i <- result
} yield {
i
}
Await.result(r, 10 seconds)
}
тест
it("check with multiple threads "){
val target = 10
val latch = new java.util.concurrent.CountDownLatch(target)
(1 to target).map{t =>
Future(getVersionTimeTrans(prodDb, 10)).map{r => latch.countDown()}
}
assert(latch.await(10, TimeUnit.SECONDS))
}
Для этого теста тоже вывод одинаковый. Я не мог понять, как правильно обернуть его внутри транзакции. Пожалуйста, помогите.