Вызовы последовательности исполнителя Project Reactor (или RxJava2) - PullRequest
0 голосов
/ 28 ноября 2018

У меня есть следующая задача, и я хочу решить ее с помощью Project Reactor (или RxJava)

Есть источники событий.Каждое событие состоит из serviceId и некоторой полезной нагрузки.Как только событие получено, нам нужно выполнить действие для указанного serviceId с полезной нагрузкой.Но мы должны убедиться, что промежуток времени между двумя запросами к одному и тому же идентификатору службы должен быть больше или равен одной секунде.Но запросы к разностным сервисам могут выполняться параллельно.

Также следует отметить, что подсчет сервисов происходит динамически.

Это выглядит как на следующем рисунке

enter image description here

В настоящее время у меня есть следующий код:

Flux.create((sink-> eventProvider.listen(new EventListner(){
                public void event(req) {
                    sink.next(req);
                }
            })))
        /* need some logic here */
        .flatMap(req -> requestExecutor.execute(req))
        .doOnNext(res -> responseProcessor.process(res))
        .subscribe();

У вас есть идеи?

Ответы [ 2 ]

0 голосов
/ 29 ноября 2018

Flux.groupBy () поможет вам в этом случае.Оператор использует функцию отображения для создания ключей и группирует испускаемые элементы на основе ключа.Вы можете рассматривать serviceId как ключ.

Flux.create((sink-> eventProvider.listen(new EventListner(){
            public void event(req) {
                sink.next(req);
            }
        })))
    .groupBy(req -> req.getServiceId()) //group req by serviceId
    .flatMap(reqGroup-> reqGroup..delayElements(Duration.ofSeconds(1)) //add minimum delay to the group
    .flatMap(req -> requestExecutor.execute(req))
    .doOnNext(res -> responseProcessor.process(res))
    .subscribe();

Вы также можете добавить различные задержки в зависимости от serviceId.Проверьте приведенный ниже фрагмент в качестве примера - четные целые числа будут задерживаться на 2 секунды, а нечетные на 1 секунду.

    Flux.range(1, 20)
    .groupBy(integer -> integer % 2)
    .flatMap(integerGroupedFlux -> {
      Flux<Integer> integerFlux;
      if (integerGroupedFlux.key() == 0) { //even integers
        integerFlux = integerGroupedFlux.delayElements(Duration.ofSeconds(2));
      } else {
        integerFlux = integerGroupedFlux.delayElements(Duration.ofSeconds(1));
      }
      return integerFlux;
    })
    .subscribe(System.out::println);
0 голосов
/ 28 ноября 2018

Если события идентифицируют службу, в которой они инициируют вызовы, вы можете использовать оператор groupBy() для разделения потоков по службам.Чтобы ввести задержку после каждого запроса на обслуживание, используйте flatMap() с параметром для однопоточного использования.

В RxJava:

observable
  .groupBy(event -> getServiceId( event )) // 1
  .flatMap(serviceObservable -> // 2
       serviceObservable // 3
         .flatMap( ev -> service(serviceObservable.getKey(), ev), 1) // 4
                           .delay(1, TimeUnit.SECONDS)) // 5
  .subscribe();
  1. Группируйте события по службечто они будут использовать.Идентификатор будет использоваться в качестве ключа позже.Это будет генерировать новые элементы при обнаружении нового идентификатора службы.
  2. serviceObservable - это GroupByObservable, который будет обработан ниже.
  3. Каждый выброс из этой наблюдаемой является событием, которое предполагаетсядля перехода на один сервис.
  4. serviceObservable.getKey() возвращает идентификатор используемого сервиса.Я изобрел метод service(), который отправляет событие в службу по идентификатору службы.Кроме того, параметр 1 указывает flatMap() на однопоточность операции, поэтому одновременно может выполняться только один запрос на обслуживание.
  5. delay() (или любой другой оператор, которого вы пожелаете) будет ожидатьза секунду до освобождения операции.

(Отказ от ответственности: этот код не тестировался, но в прошлых проектах я делал подобные виды планирования, поэтому основная идея - надежная.)

...