Как динамически добавлять элементы в горячий флюс реактора другим способом? - PullRequest
0 голосов
/ 07 мая 2020

У меня есть служба источника данных, которая принимает в качестве параметра наблюдателя.

void subscribe(Consumer onEventConsumer);

Я хочу использовать поток в качестве потока ответа для RSocket. Как я могу это сделать? Как я вижу сейчас, это должно быть что-то вроде

Flux<T> controllerMethod(RequestMessage mgs) {
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;
}

Но я сильно сомневаюсь, что это правильное решение, и я новичок в реактивном подходе, я не знаю, какие методы мне следует использовать здесь?

Ответы [ 2 ]

0 голосов
/ 09 мая 2020

это типичный вариант использования Flux.create. вы регистрируете наблюдателя внутри лямбда-выражения create, который будет передавать полученные данные в предоставленный FluxSink

0 голосов
/ 08 мая 2020

У меня была аналогичная ситуация, когда мне нужно было потреблять и передавать сообщения в потоке. Итак, это упрощенная версия, комментарии в коде:

public class Main {
    // create queue for storing the messages
    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
    private static final Consumer<String> consumer = s -> {
        // block another spawned thread if no more space is present
        try { queue.put(s); } catch (InterruptedException e) {}
    };

    public static void main(String[] args) throws Exception {
        // size shouldn't be more than our queue size
        IntStream.range(0, 50).forEach(Main::consume);
        fluxGenerator().subscribe(); // subscribe to flux
        // write to the queue after subscribing
        // here the size can be anything as long as subscriber can handle it
        IntStream.range(50, 100000).forEach(Main::consume);
    }

    static Flux<String> fluxGenerator() {
        return Flux.<String>generate(sink -> {
            // block another spawned thread if no more elements are present
            try {sink.next(queue.take()); } catch (InterruptedException e) {}
        })
                // we need to subscribe on another thread
                .subscribeOn(Schedulers.newSingle("async"))
                .log();
    }

    static void consume(String str) {
        consumer.accept(str); // consume the messages
    }

    static void consume(Number i) {
        consume("" + i);
    }

}
...