Когда потребитель подписывается на Flux, это замедляет другого потребителя. - PullRequest
0 голосов
/ 02 июля 2019

У меня есть сценарий использования, в котором мне нужно создать холодного издателя из слушателя. Я использовал методы publish и autoConnect для создания ConnectableFlux и запуска издателя на первом подписчике. Я также использовал publishOn для запуска потребителей в разных потоках. Я подписываюсь дважды с разными потребителями, потребляя данные с разной скоростью. Проблема, как только я подписываюсь на второе время, которое является очень медленным потребителем, оно замедляет первого потребителя к его собственному ритму.

package test;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class ProblemSample {

public static void main(String[] args) {
    EventProcessor myEventProcessor = new EventProcessor();
    Flux<String> flux = Flux.<String>create(sink -> {
        MyEventListener<String> myEventListener = new MyEventListener<String>() {

            public void onDataChunk(String chunk) {
                sink.next(chunk);
                System.out.println("onnext request " + Thread.currentThread().getName());
            }

            public void processComplete() {
                sink.complete();
            }
        };
        myEventProcessor.register(myEventListener);
    }).publish().autoConnect().publishOn(Schedulers.newParallel("reactor"), 1);

    flux.subscribe((v) -> {
        System.out.println("1111111 having received " + Thread.currentThread().getName() + " value : " + v);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("------------------subscribe second main---------------------");

    flux.subscribe((v) -> {
        System.out.println("22222222 having received " + Thread.currentThread().getName() + " value : " + v);
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    System.out.println("main thread ended");

}

static class EventProcessor {

    public void register(MyEventListener<String> myEventListener) {
        for (int i = 0; i < 10; i++) {
            myEventListener.onDataChunk("chunk " + i);
        }
        myEventListener.processComplete();
    }

}

static interface MyEventListener<T> {
    public void onDataChunk(String chunk);

    public void processComplete();
}

}

Я ожидаю, что оба потребителя будут использовать данные каждый со своим собственным ритмом, не влияя друг на друга.

...