Почему возникает ошибка «блокировка, которая не поддерживается в потоке response-http-nio-4», несмотря на использование Mono.fromCallable ()? - PullRequest
0 голосов
/ 09 июля 2020

Я согласен с тем, что реактивный код не должен блокироваться, но ...

В следующем методе findTenantStorageFactory () мне нужно заблокировать tenantId, чтобы метод мог вернуть указанный c Factory для арендатора. Я нашел инструкции из Project Reactor, что мне нужно обернуть метод в Mono.fromCallable () и использовать планировщик boundedElasti c (). Однако, когда я запускаю этот код в отладчике, я получаю ужасную ошибку: «block () / blockFirst () / blockLast () блокируются, что не поддерживается». Я перепробовал миллион идей, но ничего не добился. Буду признателен за ваши предложения.

//////////////////////////////////////////////////////////////////////////
fun findTenantStorageFactory(tenantId: Mono<TenantId>):
  Mono<Either<BaseAzureBlobStorageException, MultitenantAzureBlobStorageFactory>> {
    val myValue: Callable<Either<
                   BaseAzureBlobStorageException, 
                   MultitenantAzureBlobStorageFactory>> =

    Callable { lookupTenantBlobStorageFactory(tenantId.block()!!, factories) }

    return Mono.fromCallable<Either<
                  BaseAzureBlobStorageException, 
                  MultitenantAzureBlobStorageFactory>>(myValue)
               .subscribeOn(Schedulers.boundedElastic())
    }

//////////////////////////////////////////////////////////////////////////
private val blobStorageClientBuilder: AzureBlobClientBuilder
 get() = 
   findTenantStorageFactory(tenantId).block()!!
     .fold({ throw it }, { it.blobStorageClient })!!
/////////////////////////////// ERROR ///////////////////////////////////
    java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0-M1.jar:3.4.0-M1]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 

1 Ответ

0 голосов
/ 10 июля 2020

Как вы думаете, почему нужно блокировать на tenantId? Я бы использовал map, чтобы преобразовать моно в моно. Если lookupTenantBlobStorageFactory - это длительный метод, и вы хотите избежать блокировки текущего потока, publishOn должен вам помочь. Примерно так:

fun findTenantStorageFactory(tenantId: Mono<TenantId>):
  Mono<Either<BaseAzureBlobStorageException, MultitenantAzureBlobStorageFactory>> {
     return tenantId.publishOn(Schedulers.boundedElastic())
        .map({ id -> lookupTenantBlobStorageFactory(id, factories) })
}
...