Как создать асинхронную шину событий, используя RxJava 2? - PullRequest
0 голосов
/ 09 мая 2018

Используя RXJava 2, я пытаюсь создать асинхронную шину событий.

У меня есть одноэлементный объект со свойством PublishSubject. Эмиттеры могут отправить событие на шину, используя onNext на тему.

Если подписчикам нужно выполнить длинную задачу, я хочу, чтобы моя шина распределяла задачи по нескольким потокам для одновременного выполнения задач. Это означает, что я хочу, чтобы работа над элементом началась сразу же после его отправки, даже если работа над предыдущим элементом не завершена.

Однако, даже используя observeOn с scheduler, я не могу выполнять свои задачи одновременно.

Пример кода:

public void test() throws Exception {
    Subject<Integer> busSubject = PublishSubject.<Integer>create().toSerialized();

    busSubject.observeOn(Schedulers.computation())
            .subscribe(new LongTaskConsumer());

    for (int i = 1; i < 5; i++) {
        System.out.println(i + " - event");
        busSubject.onNext(i);
        Thread.sleep(1000);
    }
    Thread.sleep(1000);
}

private static class LongTaskConsumer implements Consumer<Integer> {
    @Override
    public void accept(Integer i) throws Exception {
        System.out.println(i + " -   start work");
        System.out.println(i + " -     computation on thread " + Thread.currentThread().getName());
        Thread.sleep(2000);
        System.out.println(i + " -   end work");
    }
}

Печать:

1 - event
1 -   start work
1 -     computation on thread RxComputationThreadPool-1
2 - event
3 - event
1 -   end work
2 -   start work
2 -     computation on thread RxComputationThreadPool-1
4 - event
2 -   end work
3 -   start work
3 -     computation on thread RxComputationThreadPool-1
3 -   end work
4 -   start work
4 -     computation on thread RxComputationThreadPool-1
4 -   end work

Это означает, что работа над элементом 2 ожидала окончания работы над элементом 1, даже если событие 2 уже было отправлено.

1 Ответ

0 голосов
/ 09 мая 2018

Когда происходит следующий вызов, один рабочий создается из Schedulers.computation() и используется для всего потока. Вот почему вся ваша работа выполнена на RxComputationThreadPool-1.

busSubject.observeOn(Schedulers.computation())
        .subscribe(new LongTaskConsumer());

Чтобы запланировать работу над несколькими потоками:

busSubject.flatMap(x ->
        Flowable.just(x)
            .subscribeOn(Schedulers.computation()
            .doOnNext(somethingIntensive))
    .subscribe(new LongTaskConsumer());

Обратите внимание, что интенсивная работа выполняется внутри flatMap, а не в LongTaskConsumer, поскольку все элементы будут поступать последовательно к LongTaskConsumer.

Существуют и другие подходы к параллельной работе, которые вы можете исследовать в зависимости от того, сколько событий попадает в PublishSubject.

...