SubscribeOn или PublishOn не работают с ReactiveCassandraCrudRepository - PullRequest
0 голосов
/ 23 апреля 2020

Мы используем Reactive Spring Data Repository с Spring WebFlux. Мое понимание для SubscribeOn говорит, что он решает, какой ThreadPool будет выполнять операторы до того, как SubscribeOn будет выполняться в потоке, а PublishOn решает фактический ThreadPool, для которого будет выполняться подписка. Однако в приведенном ниже коде, даже с PublishOn и SubscribeOn, код не выполняется в основном потоке, скорее он возвращается к Cluster-nio-worker-1.

    System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
    personRepository.findAll().log()
            .map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
            .subscribeOn(Schedulers.immediate())
            .publishOn(Schedulers.immediate())
            .subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
                    excp -> excp.printStackTrace(),
                    () -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1

Также, что делает Thread [cluster- nio-worker-1,5, main] значит? Почему эти вызовы методов не используют основной поток для выполнения.

1 Ответ

1 голос
/ 23 апреля 2020

Метод подписки заставляет издателя использовать данный пул потоков для публикации значений. В конвейере может быть N число метода subscribeOn. Самый последний из них вступит в силу. personRepository.findAll().log() является оберткой и возвращает поток. Так что, если он использует какие-либо Планировщики внутри, то вы не можете изменить его, используя подписку. Например, метод interval использует параллель, и я не могу изменить его на boundedElasti c, как показано здесь.

    Flux.interval(Duration.ofSeconds(1))
            .subscribeOn(Schedulers.boundedElastic())

Schedulers.immediate просто сохраняет выполнение конвейера в том же потоке. Это не главное и в вашем случае это будет cluster-nio-worker пул потоков.

Мы можем переключиться с основного на любой пул потоков Планировщика. Но мы не можем переключить выполнение обратно в основной поток. Это не ограничение проекта реактора. Это должно быть ограничение Java.

...