Динамически отправлять события / значения в поток во время выполнения приложения - PullRequest
0 голосов
/ 26 января 2019

Я пытаюсь создать реактивный конвейер, используя Java и проект-реактор, где сценарий использования заключается в том, что приложение генерирует состояние потока (INIT, PROCESSING, SAVED, DONE) на разных уровнях. Состояние должно передаваться асинхронно потоку, который необходимо обрабатывать независимо и отдельно от основного потока. Я наткнулся на эту ссылку:

Spring WebFlux (Flux): как публиковать динамически

Мой поток образцов выглядит примерно так:

public class StatusEmitterImpl implements StatusEmitter {

    private final FluxProcessor<String, String> processor;
    private final FluxSink<String> sink;

    public StatusEmitterImpl() {
        this.processor = DirectProcessor.<String>create().serialize();
        this.sink = processor.sink();
    }

    @Override
    public Flux<String> publisher() {
        return this.processor.map(x -> x);
    }

    public void publishStatus(String status) {
        sink.next(status);
    }
}

public class Try {

    public static void main(String[] args) {

    StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
    Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> 
        statusEmitter.publishStatus(x));
    statusEmitter.publisher().subscribe(x -> System.out.println(x));
    }
}

Проблема в том, что на консоли ничего не печатается. Я не могу понять, чего мне не хватает.

Ответы [ 2 ]

0 голосов
/ 28 января 2019

DirectProcessor является горячим издателем и не буферизует элемент, поэтому вы должны создать элемент после его подписки. Подобно is

public static void main(String[] args) {

        StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
        statusEmitter.publisherA().subscribe(x -> System.out.println(x));
        Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> statusEmitter.publishStatus(x));
    }

, или используйте EmitterProcessor, UnicastProcessor вместо DirectProcessor.

0 голосов
/ 28 января 2019

DirectProcessor передает значения зарегистрированному Subscribers напрямую, без кеширования сигналов.Если нет Subscriber, то значение «забыто».Если Subscriber приходит с опозданием, то он будет получать только сигналы, испускаемые после того, как подписался.

Вот что здесь происходит: потому что fromIterable работает с коллекцией в памяти,у него есть время выдвинуть все значения в DirectProcessor, который к тому времени еще не зарегистрировал Subscriber.

Если вы инвертируете две последние строки, вы должны увидеть что-то.

...