Используя RXJava 2, я пытаюсь создать асинхронную шину событий.
У меня есть одноэлементный объект со свойством PublishSubject
. Эмиттеры могут отправить событие на шину, используя onNext
на тему.
Если подписчикам нужно выполнить длинную задачу, я хочу, чтобы моя шина распределяла задачи по нескольким потокам для одновременного выполнения задач. Это означает, что я хочу, чтобы работа над элементом началась сразу же после его отправки, даже если работа над предыдущим элементом не завершена.
Однако, даже используя observeOn
с scheduler
, я не могу выполнять свои задачи одновременно.
Пример кода:
public void test() throws Exception {
Subject<Integer> busSubject = PublishSubject.<Integer>create().toSerialized();
busSubject.observeOn(Schedulers.computation())
.subscribe(new LongTaskConsumer());
for (int i = 1; i < 5; i++) {
System.out.println(i + " - event");
busSubject.onNext(i);
Thread.sleep(1000);
}
Thread.sleep(1000);
}
private static class LongTaskConsumer implements Consumer<Integer> {
@Override
public void accept(Integer i) throws Exception {
System.out.println(i + " - start work");
System.out.println(i + " - computation on thread " + Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println(i + " - end work");
}
}
Печать:
1 - event
1 - start work
1 - computation on thread RxComputationThreadPool-1
2 - event
3 - event
1 - end work
2 - start work
2 - computation on thread RxComputationThreadPool-1
4 - event
2 - end work
3 - start work
3 - computation on thread RxComputationThreadPool-1
3 - end work
4 - start work
4 - computation on thread RxComputationThreadPool-1
4 - end work
Это означает, что работа над элементом 2 ожидала окончания работы над элементом 1, даже если событие 2 уже было отправлено.