Я хочу сделать параллелизм каждой родительской и дочерней сущности, в процессе, который должен быть возвращен быстро childEntities
.Поэтому я не мог четко решить, какой путь подходит для этого процесса.Потому что в этих параллельных потоках также вызывается http-вызов и метод save
SpringdataRepository один раз (я буду управлять размером потока из-за размера пула соединений JDBC).
Кстати, я только что попробовал библиотеку RxJava-2.
Я ожидал, что -> Если процесс параллельного потока выдает исключение, метод onErrorResumeNext
(или рядом с чем-то) должен быть запущен и завершить весь процесс после исключения.Но он полностью приостанавливает поток.
Так что мне нужно -> Полностью неблокируемый параллельный поток, если одно из исключений выдает, просто перехватите его и затем продолжите остальную часть параллельного процесса.
Есть идеи ?Любые другие идеи решения приемлемы (например, ручное управление потоками)
Это то, что я пробовал, но не работает, как ожидалось.
package com.mypackage;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class TestApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
List<String> createdParentEntities = new ArrayList<>();
List<String> erroredResponses = new ArrayList<>();
List<String> childEntities = new ArrayList<>();
Flowable.range(1, 100) // 100: is not fixed normalle
.parallel(100) // It will be changed according to size
.runOn(Schedulers.io())
.map(integer -> createParentEntity(String.valueOf(integer)))
.sequential()
.onErrorResumeNext(t -> {
System.out.println(t.getMessage());
if (t instanceof Exception) {
erroredResponses.add(t.getMessage());
return Flowable.empty();
} else {
return Flowable.error(t);
}
})
.blockingSubscribe(createdParentEntities::add);
if (!createdParentEntities.isEmpty()) {
Flowable.fromIterable(createdParentEntities)
.parallel(createdParentEntities.size())
.runOn(Schedulers.io())
.doOnNext(TestApp::createChildEntity)
.sequential()
.blockingSubscribe(childEntities::add);
}
System.out.println("====================");
long time = System.currentTimeMillis() - start;
log.info("Total Time : " + time);
log.info("TOTAL CREATED ENTITIES : " + createdParentEntities.size());
log.info("CREATED ENTITIES " + createdParentEntities.toString());
log.info("ERRORED RESPONSES " + erroredResponses.toString());
log.info("TOTAL ENTITIES : " + childEntities.size());
}
public static String createParentEntity(String id) throws Exception {
Thread.sleep(1000); // Simulated for creation call
if (id.equals("35") || id.equals("75")) {
throw new Exception("ENTITIY SAVE ERROR " + id);
}
log.info("Parent entity saved : " + id);
return id;
}
public static String createChildEntity(String parentId) throws Exception {
Thread.sleep(1000);// Simulated for creation call
log.info("Incoming entity: " + parentId);
return "Child Entity: " + parentId + " parentId";
}
}