channel.asFlux (), кажется, вызывает тупик в сценарии высокого параллелизма - PullRequest
0 голосов
/ 30 июня 2019

Хорошо, я отредактировал весь вопрос и выпустил две версии кода, которые кажутся более интересными, чтобы выявить проблему .. Здесь есть 2 функции, одна вызывает взаимоблокировку, другая нет .. Проблема, кажется, внутри channel.asFlux() преобразование, но я не понимаю, как лучше избегать таких замков ..

спасибо, Франческо


    @Test
    @Disabled("investigating..")
    fun tt() = runBlocking<Unit>{
        (0..3000).map { async {
            //noDeadLock()
            yesDeadLock()
        }}.forEach { it.await()}
    }


    val thPool = newSingleThreadContext("single")
    suspend fun yesDeadLock(){
        withContext(Dispatchers.IO){ DEADLOCK
            var channel = Channel<Int>(Channel.RENDEZVOUS)
            val producer =
                launch(newSingleThreadContext("nconte")){
                    while (isActive  && !channel.isClosedForSend && !channel.isClosedForReceive ){
                        try{
                            channel.send(1)
                        }catch (t:Throwable){
                        }
                    }
                }

            withContext(Dispatchers.Default){ 
            // withContext(thPool){  // <<-- with any other context, even a single thread, this long cause deadlocks
                channel.asFlux()
                    .publishOn(Schedulers.elastic(),1)
                    .doFinally {  producer.cancel() }
                    .limitRate(1)
                    .take(30)
                    .blockLast()
            }
            1
        }
    }

    suspend fun noDeadLock(){
        withContext(Dispatchers.Default){ 
            var channel = Channel<Int>(Channel.RENDEZVOUS)
            val producer =
                launch(newSingleThreadContext("nconte")){
                    while (isActive  && !channel.isClosedForSend && !channel.isClosedForReceive ){
                        try{
                            channel.send(1)
                        }catch (t:Throwable){
                        }
                    }
                }

            withContext(Dispatchers.IO){ 
                channel.asFlux()
                    .publishOn(Schedulers.elastic(),1)
                    .doFinally {  producer.cancel() }
                    .limitRate(1)
                    .take(30)
                    .blockLast()
            }
            1
        }
    }

...