java 8 параллельных потоков с ForkJoinPool и ThreadLocal - PullRequest
1 голос
/ 03 августа 2020

Мы используем параллельный поток java 8 для обработки задачи, и мы отправляем задачу через ForkJoinPool # submit. Мы не используем ForkJoinPool.commonPool с расширением jvm, вместо этого мы создаем собственный настраиваемый пул для определения параллелизма и сохраняем его как переменную stati c.

У нас есть структура проверки, в которой мы используем список таблиц в список валидаторов, и мы отправляем это задание через настраиваемый ForkJoinPool следующим образом:

static ForkJoinPool forkJoinPool = new ForkJoinPool(4);

List<Table> tables = tableDAO.findAll();
ModelValidator<Table, ValidationResult> validator = ValidatorFactory
    .getInstance().getTableValidator();

List<ValidationResult> result = forkJoinPool.submit(
    () -> tables.stream()
                .parallel()
                .map(validator)
                .filter(result -> result.getValidationMessages().size() > 0)
                .collect(Collectors.toList())).get();

Проблема, с которой мы сталкиваемся, заключается в следующих компонентах: валидаторы, которые работают в отдельных потоках из нашего stati c ForkJoinPool, полагаются на tenant_id, который различается для каждого запроса и хранится в переменной InheritableThreadLocal. Поскольку мы создаем stati c ForkJoinPool, потоки, объединенные ForkJoinPool, наследуют только значение родительского потока, когда он создается в первый раз. Но эти объединенные потоки не будут знать новый tenant_id для текущего запроса. Таким образом, для последующего выполнения эти объединенные потоки используют старый tenant_id.

Я попытался создать собственный ForkJoinPool и указать ForkJoinWorkerThreadFactory в конструкторе и переопределить метод onStart для подачи нового tenant_id. Но это не работает, поскольку метод onStart вызывается только один раз во время создания, а не во время индивидуального выполнения.

Похоже, нам нужно что-то вроде ThreadPoolExecutor # beforeExecute, которое недоступно в случае ForkJoinPool. Итак, какая альтернатива у нас есть, если мы хотим передать локальное значение текущего потока в потоки, объединенные в статический пул? не хотел бы этого делать, чтобы избежать дорогостоящего создания потоков.

Какие у нас есть альтернативы?

Ответы [ 2 ]

1 голос
/ 05 августа 2020

Я нашел следующее решение, которое работает без изменения основного кода. В основном метод карты принимает функциональный интерфейс, который я представляю как лямбда-выражение. Это выражение добавляет ловушку preExecution для установки нового tenantId в текущем локальном потоке и очистки его в postExecution.

       forkJoinPool.submit(tables.stream()
                                 .parallel()
                                 .map((item) -> {
                                    preExecution(tenantId)
                                    try {
                                      return validator.apply(item);
                                    } finally {
                                      postExecution();
                                    }
                                  }   
                                 )
                                 .filter(validationResult -> 
                                   validationResult.getValidationMessages()
                                                   .size() > 0)
                                 .collect(Collectors.toList())).get();
0 голосов
/ 03 августа 2020

На мой взгляд, лучшим вариантом было бы избавиться от потока local и передать его вместо этого в качестве аргумента. Я понимаю, что это может быть масштабное мероприятие. Другой вариант - использовать оболочку.

Предполагая, что ваш валидатор имеет метод проверки, вы можете сделать что-то вроде:

public class WrappingModelValidator implements ModelValidator<Table. ValidationResult> {
    private final ModelValidator<Table. ValidationResult> v;
    private final String tenantId;

    public WrappingModelValidator(ModelValidator<Table. ValidationResult> v, String tenantId) {
        this.v = v;
        this.tenantId = tenantId;
    }

    public ValidationResult validate(Table t) {
      String oldValue = YourThreadLocal.get();
      YourThreadLocal.set(tenantId);
      try {
          return v.validate(t);
      } finally {
          YourThreadLocal.set(oldValue);
      }
    }
}

Затем вы просто оберните свой старый валидатор, и он установит локальный поток при входе и восстановите его, когда закончите.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...