Java Поддержка встроенной многопоточности - PullRequest
0 голосов
/ 21 февраля 2020

Нужна помощь с Java многопоточность

У меня есть случай, как показано ниже:

Есть много записей. Каждая запись имеет около 250 полей. Каждое поле должно быть проверено по предопределенному правилу.

Поэтому я определил класс FieldInfo для представления каждого поля:

public class FieldInfo {
    private String name;
    private String value;
    private String error_code;
    private String error_message;

    // ignore getters and setters
} 

Запись класса для представления записи:

public class Record {
    List<FieldInfo> fields;

    // omit getter and setter here
}

, а также интерфейс и класс правил:

public interface BusinessRule {
    // validating one field needs some other fields' value in the same record. So the list of all fields for a certain record passed in as parameter 
    public FieldInfo validate(List<FieldInfo> fields);
}

public class FieldName_Rule implements BusinessRule {

    public FieldInfo validate(List<FieldInfo> fields) { 
    // will do 
    // 1. pickup those fields required for validating this target field, including this target field
    // 2. performs validation logics A, B, C... 

    // note: all rules only read data from a database, no update/insert operations. 
    }
}

Пользователь может отправлять 5000 или более записей одновременно для процесса. Требования к производительности высоки. Я думал о том, чтобы иметь несколько потоков для отправленных, например, 5000 записей (означает, что один поток запускает несколько записей), и в каждом потоке разветвлять еще несколько потоков в каждой записи для запуска правил.

Но, к сожалению, такая встроенная многопоточность всегда умирала в моем случае.

Вот некоторые ключевые части из приведенного выше решения:

public class BusinessRuleService {

    @Autowired
    private ValidationHandler handler;

    public String process(String xmlRequest) {
        List<Record> records = XmlConverter.unmarshall(xmlRequest).toList();
        ExecutorService es = Executors.newFixedThreadPool(100);
        List<CompletableFuture<Integer> futures = 
                records.stream().map(r->CompletableFuture.supplyAsync(()-> handler.invoke(r), es)).collect(Collectors.toList());
        List<Integer> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
        System.out.println("total records %d processed.", result.size());
        es.shutdown();
        return XmlConverter.marshallObject(records);
    }
}

@Component
public class ValidationHandlerImpl implements ValidationHandler {

    @Autowired
    private List<BusinessRule> rules;

    @Override
    public int invoke(Record record) {

        ExecutorService es = Executors.newFixedThreadPool(250);
        List<CompletableFuture<FieldInfo> futures = 
                rules.stream().map(r->CompletableFuture.supplyAsync(()-> r.validate(record.getFields()), es)).collect(Collectors.toList());
        List<FieldInfo> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
        System.out.println("total records %d processed.", result.size());
        es.shutdown();
        return 0;
    }
}

Рабочий процесс: Пользователь отправляет список записей в xml строковом формате. Одна из конечных точек приложения запускает метод процесса в объекте BusinessRuleService. Процесс использует CompletableFuture для составления задач и передачи задач в ExecutorService, который имеет пул потоков размером 100. Каждая задача в списке CompletableFuture затем запускает объект ValidationHandler. Объект ValidationHandler создает другую задачу CompletableFuture и передает задачу другому ExecutorService, размер пула которого совпадает с размером списка правил.

Приведенное выше решение является правильным?

Примечание: мое текущее решение: отправленные записи обрабатываются последовательно. И 250 правил обрабатываются параллельно для каждой записи. С этим решением требуется более 2 часов для 5000 записей. Такая низкая производительность неприемлема для бизнеса.

Я очень новичок в параллельном / многопоточном программировании. Большое спасибо за все виды помощи!

1 Ответ

0 голосов
/ 21 февраля 2020

Это хорошо известная модель «один производитель - несколько потребителей». Решение classi c состоит в том, чтобы создать BlockingQueue<Record> queue и поместить туда записи в темпе их чтения. На другом конце очереди несколько рабочих потоков читают записи из queue и обрабатывают их (в нашем случае проверяют поля):

class ValidatingThread extends Tread {
   BlockingQueue<Record> queue;
   FieldName_Rule validator = new FieldName_Rule();

   public Validator (BlockingQueue<Record> queue) {
      this.queue = queue;
   }

   public void run() {
      Record record = queue.take();
      validator.validate(collectFields(record));
   }
}

Оптимальное количество потоков равно Runtime.getRuntime().availableProcessors(). Запустите их все в начале и не используйте «встроенную многопоточность». Задача, как остановить потоки после обработки всех записей, оставлена ​​в качестве учебного задания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...