потерять некоторые предметы, используя Flux.pu sh () в Reactor - PullRequest
2 голосов
/ 07 января 2020

Мой код выглядит следующим образом:

public class SequenceCreator {

    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.push(sink -> consumer = items -> items.forEach(sink::next));
    }

    public static void main(String[] args) throws InterruptedException {
        SequenceCreator sequenceCreator = new SequenceCreator();

        List<Integer> sequence1 = Lists.newArrayList(1,2,3,4,5);
        List<Integer> sequence2 = Lists.newArrayList(6,7,8,9,10);

        Thread producingThread1 = new Thread(
                () -> sequenceCreator.consumer.accept(sequence1));

        Thread producingThread2 = new Thread(
                () -> sequenceCreator.consumer.accept(sequence2));

        sequenceCreator.createNumberSequence().subscribe(System.out::println);

        producingThread1.start();
        producingThread2.start();

        while (true) {
            Thread.sleep(1000);
        }
    }
}

Вывод

1 2 3 4 5 7 8 9 10

Я не знаю, почему число 6 не выводится? Является ли причиной многопоточности?

1 Ответ

2 голосов
/ 07 января 2020

Я не знаю, почему число 6 не выводится? Является ли это причиной многопоточности?

Да, почти наверняка. Посмотрите на Javado c для Flux.push:

Программно создайте Flux с возможностью испускания нескольких элементов от однопоточного производителя через API FluxSink. Для многопоточной альтернативной возможности см. create (Consumer).

Вы не используете однопоточного производителя (нарушая задокументированное требование), поэтому поведение по сути, неопределенным в этом сценарии. Вам нужно переключиться на Flux.create, как предполагает do c, так как вы используете несколько потоков для публикации sh.

...