параллельный вызов с использованием rxjava 1 - PullRequest
0 голосов
/ 25 февраля 2019

Сценарий - сделать сервисный вызов для определенных элементов (предположим, 4 элемента), которые обновят БД.Последовательность не имеет значения.Все сервисные вызовы независимы друг от друга, но подождите, пока все вызовы не будут завершены, поэтому вы хотите перейти на параллельные вызовы, используя rxjava 1.x

Теперь проблема, с которой я сталкиваюсь, заключается в том, что сервис обновления невернуть что-нибудь, если это успешно.

public class DbClient {
  public void update(SomeObject someObject) {
  //update logic
  }
}

//client code to call update method

public void processUpdate(Map<String, SomeObject> map) {
 map.entrySet.stream.forEach(entry -> {
    dbClient.update(entry.getValue()); // how can I call this parallely using rxjava 1.x
    });
}

Ответы [ 2 ]

0 голосов
/ 25 февраля 2019

Вы можете использовать оператор flatMap для параллельного выполнения.Создайте List<Observable>, используя map, а затем выполните их, используя Observable.flatMap.Сосредоточиться на it.subscribeOn линии.Линия изменится последовательность на параллель .

См. здесь

public class MultipleExecutes {
    public class DbClient {
        Observable<String> update(String someObject) {
            // replace what you want.
            // Observable.fromCallable() <- consider this
            return Observable.just(someObject);
        }
    }

    // client code to call update method
    private List<Observable<String>> processUpdate(Map<String, String> map) {
        DbClient dbClient = new DbClient();

        return map
                .entrySet()
                .stream()
                .map(entry -> dbClient.update(entry.getValue()))
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        Map<String, String> map = new HashMap<>();
        map.put("1", "1");
        map.put("2", "2");
        map.put("3", "3");
        map.put("4", "4");

        List<Observable<String>> o = new MultipleExecutes().processUpdate(map);
        Observable
                .fromIterable(o)
                .flatMap(it -> it.subscribeOn(Schedulers.computation())) <-- important line
                .subscribe(System.out::println);

        Thread.sleep(500);
    }
}

Выход

4
1
2
3
0 голосов
/ 25 февраля 2019

Обратите внимание, что RxJava 1 устарел и больше не поддерживается и не поддерживается как библиотека.

Вы можете flatMap каждый элемент карты на свое собственное реактивное действие, чтобы они выполнялись одновременно:

Observable.from(map.entrySet())
.flatMap(entry -> 
    Observable.create(emitter -> {
        dbClient.update(entry.getValue());
        emitter.onCompleted();
    })
    .subscribeOn(Schedulers.io()),
    true, // <------------ aggregate all errors
    8 // <---------------- number of concurrent updates you want
)
...