Я использую событие, используя разные внешние источники / перенос подписки на разные Flowable.Источник не имеет значения, так как я мог воспроизвести проблему с помощью простого цикла.У меня есть:
- Другой FlowableEmitter (3 достаточно для воспроизведения)
- Одиночная нить для эмиттера (ниже находится основная нить)
- Одиночная нить для подписчика (newSingleThreadExecutor)
Этот простой код воспроизводит
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);
List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
final int finalI = i;
Flowable.create( new FlowableOnSubscribe<Long>(){
@Override
public void subscribe(FlowableEmitter<Long> emitter) {
emitterList.add(emitter);
}
}, BackpressureStrategy.MISSING)
.observeOn(scheduler)
.subscribe(
val -> System.out.println(
"[" +Thread.currentThread().getName()
+ "] Flow:" + finalI
+ " > " + Long.toString(val)));
}
long state = 0;
for (int i=0; i<5; ++i) {
for (FlowableEmitter<Long> emitter: emitterList){
emitter.onNext(++state);
}
}
executor.shutdown();
Моя проблема заключается в том, что события не используются в том же порядке, в котором они были отправлены.если я удаляю наблюдаемую (планировщик), она работает нормально, но мне нужно излучатель и подписчик на разных потоках.Я также протестировал разные BackpressureStrategy, и это не помогает.
Любая подсказка, чтобы все номера были подписаны / напечатаны по порядку (1,2,3,4,5 ... 14,15) вместо того, что я имею ниже
[pool-1-thread-1] Flow:0 > 1
[pool-1-thread-1] Flow:0 > 4
[pool-1-thread-1] Flow:0 > 7
[pool-1-thread-1] Flow:0 > 10
[pool-1-thread-1] Flow:0 > 13
[pool-1-thread-1] Flow:1 > 2
[pool-1-thread-1] Flow:1 > 5
[pool-1-thread-1] Flow:1 > 8
[pool-1-thread-1] Flow:1 > 11
[pool-1-thread-1] Flow:1 > 14
[pool-1-thread-1] Flow:2 > 3
[pool-1-thread-1] Flow:2 > 6
[pool-1-thread-1] Flow:2 > 9
[pool-1-thread-1] Flow:2 > 12
[pool-1-thread-1] Flow:2 > 15
Я использую rx-java 2.2.5 и Java 8, если это имеет значение.