Ваши операции не заблокированы должным образом или не являются атомарными. Хотя один поток, возможно, только что снял блокировку с вашей карты, два других могут одновременно выйти из своих соответствующих циклов while.
Кроме того, пустые бесконечные циклы while создают ненужную нагрузку на ваши ресурсы.
Я заменил вашу карту блокировки на карту, которая содержит Future
для каждого пути. Карта обновляется исключительно атомарным методом compute
. Если путь никогда не был написан прежде, чем мы получим null
и создадим наше первое будущее записи. Если, с другой стороны, путь был записан ранее, мы можем просто связать нашу новую операцию записи. Я сделал это с .map
, который позволяет обрабатывать ошибки, если это необходимо.
См. SO: будущее цепочка для получения дополнительной информации о .map
против .andThen
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object concurrent {
val writeOperations: ConcurrentMap[String, Future[_]] = new ConcurrentHashMap[String, Future[_]]()
def writeToFile(path: String): Unit = {
val writeOp = writeOperations.compute(path, (path, future) => {
if (future == null) {
Future {
println("write to file " + path)
Thread.sleep(2000)
}
} else {
future.map(x => { // you can use x for error handling of previous write operation
println("write to file " + path)
Thread.sleep(2000)
})
}
})
}
def main(args: Array[String]): Unit = {
val futures = Seq(
Future {
writeToFile("1")
}, Future {
writeToFile("1")
}, Future {
writeToFile("1")
}
)
futures.foreach(Await.ready(_, Duration.Inf))
writeOperations.values().forEach( x => Await.ready(x, Duration.Inf))
}
}