Нужна помощь с Java многопоточность
У меня есть случай, как показано ниже:
Есть много записей. Каждая запись имеет около 250 полей. Каждое поле должно быть проверено по предопределенному правилу.
Поэтому я определил класс FieldInfo для представления каждого поля:
public class FieldInfo {
private String name;
private String value;
private String error_code;
private String error_message;
// ignore getters and setters
}
Запись класса для представления записи:
public class Record {
List<FieldInfo> fields;
// omit getter and setter here
}
, а также интерфейс и класс правил:
public interface BusinessRule {
// validating one field needs some other fields' value in the same record. So the list of all fields for a certain record passed in as parameter
public FieldInfo validate(List<FieldInfo> fields);
}
public class FieldName_Rule implements BusinessRule {
public FieldInfo validate(List<FieldInfo> fields) {
// will do
// 1. pickup those fields required for validating this target field, including this target field
// 2. performs validation logics A, B, C...
// note: all rules only read data from a database, no update/insert operations.
}
}
Пользователь может отправлять 5000 или более записей одновременно для процесса. Требования к производительности высоки. Я думал о том, чтобы иметь несколько потоков для отправленных, например, 5000 записей (означает, что один поток запускает несколько записей), и в каждом потоке разветвлять еще несколько потоков в каждой записи для запуска правил.
Но, к сожалению, такая встроенная многопоточность всегда умирала в моем случае.
Вот некоторые ключевые части из приведенного выше решения:
public class BusinessRuleService {
@Autowired
private ValidationHandler handler;
public String process(String xmlRequest) {
List<Record> records = XmlConverter.unmarshall(xmlRequest).toList();
ExecutorService es = Executors.newFixedThreadPool(100);
List<CompletableFuture<Integer> futures =
records.stream().map(r->CompletableFuture.supplyAsync(()-> handler.invoke(r), es)).collect(Collectors.toList());
List<Integer> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("total records %d processed.", result.size());
es.shutdown();
return XmlConverter.marshallObject(records);
}
}
@Component
public class ValidationHandlerImpl implements ValidationHandler {
@Autowired
private List<BusinessRule> rules;
@Override
public int invoke(Record record) {
ExecutorService es = Executors.newFixedThreadPool(250);
List<CompletableFuture<FieldInfo> futures =
rules.stream().map(r->CompletableFuture.supplyAsync(()-> r.validate(record.getFields()), es)).collect(Collectors.toList());
List<FieldInfo> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("total records %d processed.", result.size());
es.shutdown();
return 0;
}
}
Рабочий процесс: Пользователь отправляет список записей в xml строковом формате. Одна из конечных точек приложения запускает метод процесса в объекте BusinessRuleService. Процесс использует CompletableFuture для составления задач и передачи задач в ExecutorService, который имеет пул потоков размером 100. Каждая задача в списке CompletableFuture затем запускает объект ValidationHandler. Объект ValidationHandler создает другую задачу CompletableFuture и передает задачу другому ExecutorService, размер пула которого совпадает с размером списка правил.
Приведенное выше решение является правильным?
Примечание: мое текущее решение: отправленные записи обрабатываются последовательно. И 250 правил обрабатываются параллельно для каждой записи. С этим решением требуется более 2 часов для 5000 записей. Такая низкая производительность неприемлема для бизнеса.
Я очень новичок в параллельном / многопоточном программировании. Большое спасибо за все виды помощи!