Неблокирующая параллельная обработка и передача данных друг другу - PullRequest
0 голосов
/ 27 сентября 2018

Я хочу сделать параллелизм каждой родительской и дочерней сущности, в процессе, который должен быть возвращен быстро childEntities.Поэтому я не мог четко решить, какой путь подходит для этого процесса.Потому что в этих параллельных потоках также вызывается http-вызов и метод save SpringdataRepository один раз (я буду управлять размером потока из-за размера пула соединений JDBC).

Кстати, я только что попробовал библиотеку RxJava-2.

Я ожидал, что -> Если процесс параллельного потока выдает исключение, метод onErrorResumeNext (или рядом с чем-то) должен быть запущен и завершить весь процесс после исключения.Но он полностью приостанавливает поток.

Так что мне нужно -> Полностью неблокируемый параллельный поток, если одно из исключений выдает, просто перехватите его и затем продолжите остальную часть параллельного процесса.

Есть идеи ?Любые другие идеи решения приемлемы (например, ручное управление потоками)

Это то, что я пробовал, но не работает, как ожидалось.

package com.mypackage;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

@Slf4j
public class TestApp {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        List<String> createdParentEntities = new ArrayList<>();
        List<String> erroredResponses = new ArrayList<>();
        List<String> childEntities = new ArrayList<>();

        Flowable.range(1, 100) // 100: is not fixed normalle
                .parallel(100) // It will be changed according to size
                .runOn(Schedulers.io())
                .map(integer -> createParentEntity(String.valueOf(integer)))
                .sequential()
                .onErrorResumeNext(t -> {
                    System.out.println(t.getMessage());
                    if (t instanceof Exception) {
                        erroredResponses.add(t.getMessage());
                        return Flowable.empty();
                    } else {
                        return Flowable.error(t);
                    }
                })
                .blockingSubscribe(createdParentEntities::add);

        if (!createdParentEntities.isEmpty()) {
            Flowable.fromIterable(createdParentEntities)
                    .parallel(createdParentEntities.size())
                    .runOn(Schedulers.io())
                    .doOnNext(TestApp::createChildEntity)
                    .sequential()
                    .blockingSubscribe(childEntities::add);
        }

        System.out.println("====================");
        long time = System.currentTimeMillis() - start;
        log.info("Total Time : " + time);

        log.info("TOTAL CREATED ENTITIES : " + createdParentEntities.size());
        log.info("CREATED ENTITIES " + createdParentEntities.toString());
        log.info("ERRORED RESPONSES " + erroredResponses.toString());
        log.info("TOTAL ENTITIES : " + childEntities.size());
    }

    public static String createParentEntity(String id) throws Exception {
        Thread.sleep(1000); // Simulated for creation call
        if (id.equals("35") || id.equals("75")) {
            throw new Exception("ENTITIY SAVE ERROR " + id);
        }
        log.info("Parent entity saved : " + id);
        return id;
    }

    public static String createChildEntity(String parentId) throws Exception {
        Thread.sleep(1000);// Simulated for creation call
        log.info("Incoming entity: " + parentId);
        return "Child Entity:  " + parentId + " parentId";
    }
}
...