Reactor 3 Emitter / Subscriber paralllel - PullRequest
       9

Reactor 3 Emitter / Subscriber paralllel

0 голосов
/ 21 февраля 2019

Я новичок в Реактивном программировании и у меня много вопросов.Я думаю, что это не недостаток примеров или документации, просто мое понимание неверно.

Я пытаюсь подражать медленному подписчику;

Вот пример кода

Flux.create(sink -> {
    int i = 0;
    while (true) {
        try {
            System.out.println("Sleep for " + MILLIS);
            Thread.sleep(MILLIS);
            int it = i++;
            System.out.println("Back to work, iterator " + it);
            sink.next(it);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
    try {
        System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
        Thread.sleep(MILLIS + 4000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

Системный выход -

Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]

Я подумал, что если подписчик работает медленно, я должен увидеть больше потоков из-за Schedulers.elastic()

Также я попытался сделать publishOn(), и кажетсякак я делаю это асинхронно, но все еще не смог обработать результат в нескольких потоках.

Спасибо за комментарии и ответы.

1 Ответ

0 голосов
/ 21 февраля 2019

Если вы хотите, чтобы он работал в разных потоках, вам нужно использовать .parallel () вот так, а emit будет помещен в другой поток

Flux.create(sink -> {
        int i = 0;
        while (true) {
            try {
                System.out.println("Sleep for " + MILLIS);
                Thread.sleep(100);
                int it = i++;
                System.out.println("Back to work, iterator " + it);
                sink.next("a");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    })

            .parallel()
            .runOn(Schedulers.elastic())

            .subscribe(x -> {
                try {
                    System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
                    Thread.sleep(100 + 4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
    ;
}
...