Наблюдается многократный поток данных на одном планировщике, событие не используется в том же порядке, что и отправленное - PullRequest
0 голосов
/ 04 марта 2019

Я использую событие, используя разные внешние источники / перенос подписки на разные Flowable.Источник не имеет значения, так как я мог воспроизвести проблему с помощью простого цикла.У меня есть:

  • Другой FlowableEmitter (3 достаточно для воспроизведения)
  • Одиночная нить для эмиттера (ниже находится основная нить)
  • Одиночная нить для подписчика (newSingleThreadExecutor)

Этот простой код воспроизводит

    final ExecutorService executor = Executors.newSingleThreadExecutor();
    final Scheduler scheduler = Schedulers.from(executor);

    List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
    for (int i=0; i<3; ++i) {
        final int finalI = i;
        Flowable.create( new FlowableOnSubscribe<Long>(){
            @Override
            public void subscribe(FlowableEmitter<Long> emitter) {
                emitterList.add(emitter);
            }
        },  BackpressureStrategy.MISSING)
                .observeOn(scheduler)
                .subscribe(
                        val -> System.out.println(
                            "[" +Thread.currentThread().getName()
                            + "] Flow:" + finalI 
                            + " > " + Long.toString(val)));
    }

    long state = 0;
    for (int i=0; i<5; ++i) {
        for (FlowableEmitter<Long> emitter: emitterList){
            emitter.onNext(++state);
        }
    }

    executor.shutdown();

Моя проблема заключается в том, что события не используются в том же порядке, в котором они были отправлены.если я удаляю наблюдаемую (планировщик), она работает нормально, но мне нужно излучатель и подписчик на разных потоках.Я также протестировал разные BackpressureStrategy, и это не помогает.
Любая подсказка, чтобы все номера были подписаны / напечатаны по порядку (1,2,3,4,5 ... 14,15) вместо того, что я имею ниже

[pool-1-thread-1] Flow:0 > 1
[pool-1-thread-1] Flow:0 > 4
[pool-1-thread-1] Flow:0 > 7
[pool-1-thread-1] Flow:0 > 10
[pool-1-thread-1] Flow:0 > 13
[pool-1-thread-1] Flow:1 > 2
[pool-1-thread-1] Flow:1 > 5
[pool-1-thread-1] Flow:1 > 8
[pool-1-thread-1] Flow:1 > 11
[pool-1-thread-1] Flow:1 > 14
[pool-1-thread-1] Flow:2 > 3
[pool-1-thread-1] Flow:2 > 6
[pool-1-thread-1] Flow:2 > 9
[pool-1-thread-1] Flow:2 > 12
[pool-1-thread-1] Flow:2 > 15

Я использую rx-java 2.2.5 и Java 8, если это имеет значение.

1 Ответ

0 голосов
/ 04 марта 2019

observeOn несправедливо и может обнять поток эмиссии, если есть еще работа.При круговом излучении один источник может быть готов излучать больше, и, таким образом, вы не получите идеальное чередование.Также обратите внимание, что при использовании стратегии MISSING вы подвержены исключениям противодавления.Попробуйте requestObserveOn из проекта расширений:

final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);

List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
    final int finalI = i;
    Flowable.create( new FlowableOnSubscribe<Long>(){
        @Override
        public void subscribe(FlowableEmitter<Long> emitter) {
            emitterList.add(emitter);
        }
    },  BackpressureStrategy.BUFFER)
            // ---------------------------------------------------------
            .compose(FlowableTransformers.requestObserveOn(scheduler))
            // ---------------------------------------------------------
            .subscribe(
                    val -> System.out.println(
                        "[" +Thread.currentThread().getName()
                        + "] Flow:" + finalI 
                        + " > " + Long.toString(val)));
}

long state = 0;
for (int i=0; i<5; ++i) {
    for (FlowableEmitter<Long> emitter: emitterList){
        emitter.onNext(++state);
    }
}

executor.shutdown();
...