Мы используем параллельный поток java 8 для обработки задачи, и мы отправляем задачу через ForkJoinPool # submit. Мы не используем ForkJoinPool.commonPool с расширением jvm, вместо этого мы создаем собственный настраиваемый пул для определения параллелизма и сохраняем его как переменную stati c.
У нас есть структура проверки, в которой мы используем список таблиц в список валидаторов, и мы отправляем это задание через настраиваемый ForkJoinPool следующим образом:
static ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<Table> tables = tableDAO.findAll();
ModelValidator<Table, ValidationResult> validator = ValidatorFactory
.getInstance().getTableValidator();
List<ValidationResult> result = forkJoinPool.submit(
() -> tables.stream()
.parallel()
.map(validator)
.filter(result -> result.getValidationMessages().size() > 0)
.collect(Collectors.toList())).get();
Проблема, с которой мы сталкиваемся, заключается в следующих компонентах: валидаторы, которые работают в отдельных потоках из нашего stati c ForkJoinPool, полагаются на tenant_id, который различается для каждого запроса и хранится в переменной InheritableThreadLocal. Поскольку мы создаем stati c ForkJoinPool, потоки, объединенные ForkJoinPool, наследуют только значение родительского потока, когда он создается в первый раз. Но эти объединенные потоки не будут знать новый tenant_id для текущего запроса. Таким образом, для последующего выполнения эти объединенные потоки используют старый tenant_id.
Я попытался создать собственный ForkJoinPool и указать ForkJoinWorkerThreadFactory в конструкторе и переопределить метод onStart для подачи нового tenant_id. Но это не работает, поскольку метод onStart вызывается только один раз во время создания, а не во время индивидуального выполнения.
Похоже, нам нужно что-то вроде ThreadPoolExecutor # beforeExecute, которое недоступно в случае ForkJoinPool. Итак, какая альтернатива у нас есть, если мы хотим передать локальное значение текущего потока в потоки, объединенные в статический пул? не хотел бы этого делать, чтобы избежать дорогостоящего создания потоков.
Какие у нас есть альтернативы?