Являются ли вызовы подписки потребительского аргумента последовательными в Project Reactor? - PullRequest
0 голосов
/ 25 февраля 2019

С помощью следующего кода:

flux.subscribe(consumer)

вызовы consumer могут осуществляться в разных потоках, в зависимости от того, как был построен flux (например, с использованием subscribeOn или publishOn).Существует ли гарантия того, что, хотя вызовы consumer могут выполняться в разных потоках, вызовы являются последовательными, т. Е. Каждый вызов завершается до начала следующего?

Более конкретный пример ниже (с использованием Reactor-Kafka):

val resultFlux: Flux<Pair<TopicPartition, Long>> = KafkaReceiver
    .create<K, V>(receiverOptions)
    .receive()
    .groupBy { m -> m.receiverOffset().topicPartition() }
    .flatMap { partitionFlux ->
        val parallelRoFlux = partitionFlux
                .publishOn(scheduler)
                .flatMapSequential(::processRecord, parallelism)
        parallelRoFlux.map { ro ->
            acknowledge(ro)
            Pair(ro.topicPartition(), ro.offset())
        }
    }

resultFlux.doOnNext { Thread.sleep(2000); log.info("doOnNext: $it") }
        .subscribe { Thread.sleep(1000); log.info("subscribe: $it") }

создает следующий выходной фрагмент:

13:44:26.401 [elastic-6] INFO  consumerSvcFlow - Message_5>>>processed
13:44:28.402 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 15)
13:44:29.402 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 15)
13:44:29.435 [elastic-8] INFO  consumerSvcFlow - Message_8>>>processed
13:44:31.435 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 16)
13:44:32.436 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 16)
13:44:32.461 [elastic-6] INFO  consumerSvcFlow - Message_9>>>processed
13:44:34.462 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 17)
13:44:35.462 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 17)
13:44:35.494 [elastic-8] INFO  consumerSvcFlow - Message_15>>>processed
13:44:37.494 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 18)
13:44:38.495 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 18)
13:44:38.497 [elastic-6] INFO  consumerSvcFlow - Message_18>>>processed
13:44:40.498 [elastic-6] INFO  consumerExecutable - doOnNext: (demo-topic-0, 19)
13:44:41.499 [elastic-6] INFO  consumerExecutable - subscribe: (demo-topic-0, 19)
13:44:41.539 [elastic-8] INFO  consumerSvcFlow - Message_19>>>processed
13:44:43.540 [elastic-8] INFO  consumerExecutable - doOnNext: (demo-topic-0, 20)
13:44:44.540 [elastic-8] INFO  consumerExecutable - subscribe: (demo-topic-0, 20)

Вызовы аргумента subscribe customer являются последовательными, но некоторые вызовы выполняются в потоке [astic-6]и некоторые находятся в потоке [упругий-8].

1 Ответ

0 голосов
/ 26 февраля 2019

Да, есть такая гарантия, согласно спецификации Reactive Streams.

Во-первых, вызовы могут происходить в другом потоке , чем тот, из которого вы вызвали subscribe().Но все потребительские вызовы происходят в одном и том же потоке.

Во-вторых, потребитель значения в методе subscribe(Consumer<T>) фактически считается сигналом onNext в Subscriber, поэтому спецификация обеспечиваетчто такие вызовы сериализуются по отношению друг к другу и к сигналам onComplete и onError.

Редактировать: теперь, когда вы добавили некоторый фрагмент, тот факт, что у вас есть 2 потока, происходит изpublishOn сделано внутри flatMap.Поэтому каждая группа groupBy может выбрать различный Worker из Scheduler (если у него их много).Следовательно, обработка, выполняемая в этих внутренних последовательностях, может выполняться параллельно.Результат однако при объединении в flatMap сериализуются => subscribe(Consumer) являются последовательными.

...