Хорошо, я отредактировал весь вопрос и выпустил две версии кода, которые кажутся более интересными, чтобы выявить проблему .. Здесь есть 2 функции, одна вызывает взаимоблокировку, другая нет .. Проблема, кажется, внутри channel.asFlux()
преобразование, но я не понимаю, как лучше избегать таких замков ..
спасибо, Франческо
@Test
@Disabled("investigating..")
fun tt() = runBlocking<Unit>{
(0..3000).map { async {
//noDeadLock()
yesDeadLock()
}}.forEach { it.await()}
}
val thPool = newSingleThreadContext("single")
suspend fun yesDeadLock(){
withContext(Dispatchers.IO){ DEADLOCK
var channel = Channel<Int>(Channel.RENDEZVOUS)
val producer =
launch(newSingleThreadContext("nconte")){
while (isActive && !channel.isClosedForSend && !channel.isClosedForReceive ){
try{
channel.send(1)
}catch (t:Throwable){
}
}
}
withContext(Dispatchers.Default){
// withContext(thPool){ // <<-- with any other context, even a single thread, this long cause deadlocks
channel.asFlux()
.publishOn(Schedulers.elastic(),1)
.doFinally { producer.cancel() }
.limitRate(1)
.take(30)
.blockLast()
}
1
}
}
suspend fun noDeadLock(){
withContext(Dispatchers.Default){
var channel = Channel<Int>(Channel.RENDEZVOUS)
val producer =
launch(newSingleThreadContext("nconte")){
while (isActive && !channel.isClosedForSend && !channel.isClosedForReceive ){
try{
channel.send(1)
}catch (t:Throwable){
}
}
}
withContext(Dispatchers.IO){
channel.asFlux()
.publishOn(Schedulers.elastic(),1)
.doFinally { producer.cancel() }
.limitRate(1)
.take(30)
.blockLast()
}
1
}
}