Обработка парализованных данных от издателя - PullRequest
1 голос
/ 18 февраля 2020

Как обрабатывать данные, паралельно подписанные с Publisher?

  1. Должен ли я подписаться на al oop в пуле работников? Когда я звоню подписаться на Project Reactor, я получил только один кусок данных. Как «истощить все»?
  2. Как обеспечить, чтобы каждый работник получал разные куски данных?

Ответы [ 2 ]

1 голос
/ 19 февраля 2020

В Project Reactor используйте оператор parallel():

Flux.from(thePublisher) //if we don't assume publisher is already a Flux
    .parallel() //instruct the Flux to divide work on "rails",
    //but so far these rails are running on the same thread !
    .runOn(Schedulers.parallel()) //now each rail runs on its own thread
    .map(...).etc(...)
    .sequential() //merge the rails back to a single sequence
    //subscribe, or continue processing sequentially

Rx Java 2 очень похож и имеет тот же оператор parallel.

0 голосов
/ 18 февраля 2020

Преобразуйте каждый кусок данных в задачу и отправьте его в Executor. Конвертер может выглядеть следующим образом:

class Converter implements Subscriber<T> {
    final Executor executor;
    Subscription subscription;

    Converter(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void onSubscribe(Subscription s) {
        subscription = s;
        s.request(1);
    }

    @Override
    public void onNext(T data) {
        executor.execute(()->process(data));
        subscription.request(1);
    }
    ...        
    void process(T o) {
        ...
    }
}
...