Невозможно запустить одновременных подписчиков, используя циклоп-реакцию - PullRequest
0 голосов
/ 15 мая 2018

Можно ли иметь одновременных подписчиков, использующих библиотеку циклоп-реагировать?Например, если запустить следующий код:

    ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1, 2, 3, 4, 5, 6);

    ReactiveSubscriber<Integer> sub1 = Spouts.reactiveSubscriber();
    ReactiveSubscriber<Integer> sub2 = Spouts.reactiveSubscriber();

    FutureStream<Integer> futureStream = FutureStream.builder().fromStream(initialStream)
            .map(v -> v -1);

    futureStream.subscribe(sub1);
    futureStream.subscribe(sub2);

    CompletableFuture future1 = CompletableFuture.runAsync(() -> sub1.reactiveStream().forEach(v -> System.out.println("1 -> " + v)));
    CompletableFuture future2 = CompletableFuture.runAsync(() -> sub2.reactiveStream().forEach(v -> System.out.println("2 -> " + v)));

    try {
        future1.get();
        future2.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

, я получу следующий результат:

1 -> 0 
2 -> 0
2 -> 1
1 -> 0
1 -> 1
1 -> 1
2 -> 2
2 -> 3
2 -> 4
2 -> 5
1 -> 2
1 -> 2
1 -> 3
1 -> 4
1 -> 5
1 -> 3
1 -> 4
1 -> 5

Я получаю повторные значения в потоках подписчиков.Заранее благодарю за любую помощь.

1 Ответ

0 голосов
/ 25 июля 2018

Циклоп-реакция поддерживает только отдельных подписчиков. Я думаю, что поведение здесь должно быть изменено, чтобы игнорировать вторую попытку подписки, а не позволять ей испортить оба (я сообщу об ошибке - спасибо!).

Однако вы можете использовать Темы с тем же эффектом. Мы можем переписать ваш пример, используя темы

ReactiveSeq<Integer> initialStream = ReactiveSeq.of(1,2,3,4,5,6);



        FutureStream<Integer> futureStream = FutureStream.builder()
                                                         .fromStream(initialStream)
                                                         .map(v -> v -1);
        Queue<Integer> queue= QueueFactories.<Integer>boundedNonBlockingQueue(1000).build();
        Topic<Integer> topic = new Topic<Integer>(queue,QueueFactories.<Integer>boundedNonBlockingQueue(1000));

        ReactiveSeq<Integer> s2 = topic.stream();
        ReactiveSeq<Integer> s1 = topic.stream();

        Thread t = new Thread(()->{
            topic.fromStream(futureStream);
            topic.close();
        });
        t.start();


        CompletableFuture future1 = CompletableFuture.runAsync(() -> s1.forEach(v -> System.out.println("1 -> " + v)));
        CompletableFuture future2 = CompletableFuture.runAsync(() -> s2.forEach(v -> System.out.println("2 -> " + v)));

        try {

            future1.get();
            future2.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

И результат больше соответствует ожидаемому

2 -> 0 1 -> 0 2 -> 1 1 -> 1 2 -> 2 1 -> 2 2 -> 3 1 -> 3 2 -> 4 1 -> 4 2 -> 5 1 -> 5

...