Распространение ThreadLocal на новый поток, извлеченный из ExecutorService - PullRequest
28 голосов
/ 31 августа 2011

Я запускаю процесс в отдельном потоке с таймаутом, используя ExecutorService и Future (пример кода здесь ) (поток "порождение" происходит в аспекте AOP).

Теперь основной поток - это запрос Resteasy . Resteasy использует одну или несколько переменных ThreadLocal для хранения некоторой контекстной информации, которую мне нужно получить в какой-то момент моего вызова метода Rest. Проблема в том, что поток Resteasy работает в новом потоке, переменные ThreadLocal теряются.

Каков наилучший способ «распространения» любой переменной ThreadLocal, используемой Resteasy, в новый поток? Похоже, что Resteasy использует более одной переменной ThreadLocal для отслеживания контекстной информации, и я хотел бы «вслепую» перенести всю информацию в новый поток.

Я рассмотрел подкласс ThreadPoolExecutor и использовал метод beforeExecute для передачи текущего потока в пул, но не смог найти способ передачи переменных ThreadLocal в пул.

Есть предложения?

Спасибо

Ответы [ 6 ]

20 голосов
/ 31 августа 2011

Набор ThreadLocal экземпляров, связанных с потоком, хранится в закрытых элементах каждого Thread.Ваш единственный шанс перечислить их - подумать о Thread;таким образом, вы можете переопределить ограничения доступа к полям потока.

Как только вы сможете получить набор ThreadLocal, вы можете скопировать в фоновые потоки, используя beforeExecute() и afterExecute() перехватчики ThreadPoolExecutor, или создавая оболочку Runnable для ваших задач, которая перехватывает вызов run() для установки отмены необходимых ThreadLocal экземпляров.На самом деле последний метод может работать лучше, поскольку он даст вам удобное место для хранения значений ThreadLocal во время постановки задачи в очередь.


Обновление: Вотболее конкретная иллюстрация второго подхода.Вопреки моему первоначальному описанию, все, что хранится в оболочке, это вызывающий поток, который запрашивается при выполнении задачи.

static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread's 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}
3 голосов
/ 14 февраля 2017

На основе ответа @erickson я написал этот код.Это работает для унаследованныхThreadLocals.Он строит список наследуемыхThreadLocals, используя тот же метод, который используется в конструкторе потоков.Конечно, я использую рефлексию, чтобы сделать это.Также я переопределяю класс исполнителя.

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

Wrapper:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

метод копирования:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }
3 голосов
/ 31 августа 2011

Как я понимаю вашу проблему, вы можете взглянуть на InheritableThreadLocal , который предназначен для передачи ThreadLocal переменных из контекста родительского потока в контекст дочернего потока

0 голосов
/ 01 ноября 2017

Мне не нравится подход Reflection.Альтернативным решением было бы реализовать обертку executor и передать объект напрямую как ThreadLocal контекст всем дочерним потокам, распространяющим родительский контекст.

public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

==>

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }
0 голосов
/ 17 мая 2017

Вот пример для передачи текущего LocaleContext в родительском потоке в дочерний поток, охватываемый CompletableFuture [по умолчанию он использовал ForkJoinPool].

Просто определите все, что вы хотели сделать в дочернем потоке внутри блока Runnable. Поэтому, когда CompletableFuture выполняет блок Runnable, его дочерний поток контролирует и вуаля, у вас есть родительский поток ThreadLocal, установленный в дочернем ThreadLocal.

Проблема здесь не в том, что весь ThreadLocal скопирован. Только LocaleContext копируется. Поскольку ThreadLocal имеет частный доступ только к потоку, он тоже принадлежит с помощью Reflection, и попытка получить и установить его в Child - слишком много дурацких вещей, которые могут привести к утечкам памяти или снижению производительности.

Так что, если вы знаете интересующие вас параметры из ThreadLocal, то это решение работает намного чище.

 public void parentClassMethod(Request request) {
        LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
        executeInChildThread(() -> {
                LocaleContextHolder.setLocaleContext(currentLocale);
                //Do whatever else you wanna do
            }));

        //Continue stuff you want to do with parent thread
}


private void executeInChildThread(Runnable runnable) {
    try {
        CompletableFuture.runAsync(runnable)
            .get();
    } catch (Exception e) {
        LOGGER.error("something is wrong");
    }
}
0 голосов
/ 31 августа 2011

Если вы посмотрите на код ThreadLocal, вы увидите:

    public T get() {
        Thread t = Thread.currentThread();
        ...
    }

текущий поток не может быть перезаписан.

Возможные решения:

  1. Посмотрите на механизм вилки / соединения Java 7 (но я думаю, что это плохой путь)

  2. Посмотрите на одобренный механизм для перезаписи ThreadLocal класса в вашей JVM.

  3. Попробуйте переписать RESTEasy (вы можете использовать инструменты Refactor в вашей IDE, чтобы заменить все использование ThreadLocalэто выглядит просто)

...