Реактивные потоки: как ждать всех издателей по ключу? - PullRequest
0 голосов
/ 23 июня 2019

Предположим, у меня есть 3 издателя и 1 процессор.Издатели выпускают элементы в форме {key: <integer>, value: <object>, publisher_id: <string>}.

Издатели выполняют операции ввода-вывода, поэтому:

  • С одной стороны, я бы хотел, чтобы издатели работали (примерно) N элементов в данный момент.
  • С другой стороны, я бы хотел, чтобы потребитель объединил элементов в одну запись (т. Е. {key: <integer>, values: <list>})

Я уже реализовал FluxProcessor, который имеет внутреннее хранилище (ConcurrentHashMap) для хранения всех элементов.Он вручную request() новых элементов всякий раз, когда CAPACITY не был достигнут.

Я хотел бы знать, есть ли встроенная функциональность для этого с RxJava (2) / Spring Reactor API?

1 Ответ

1 голос
/ 27 июня 2019

Использовать слияние, перезапускать запросы и toMultimap с RxJava 2:

Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...

Flowable.merge(
    source1.rebatchRequests(N),
    source2.rebatchRequests(N),
    source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));
...