Как использовать MDC с пулами потоков? - PullRequest
121 голосов
/ 20 мая 2011

В нашем программном обеспечении мы широко используем MDC для отслеживания таких вещей, как идентификаторы сеансов и имена пользователей для веб-запросов. Это прекрасно работает во время работы в оригинальном потоке. Тем не менее, есть много вещей, которые нужно обрабатывать в фоновом режиме. Для этого мы используем классы java.concurrent.ThreadPoolExecutor и java.util.Timer вместе с некоторыми автоматически выполняемыми службами асинхронного выполнения. Все эти службы управляют своим собственным пулом потоков.

Вот что Руководство Logback говорит об использовании MDC в такой среде:

Копия сопоставленного диагностического контекста не всегда может быть унаследована рабочими потоками из инициирующего потока. Это тот случай, когда java.util.concurrent.Executors используется для управления потоками. Например, метод newCachedThreadPool создает ThreadPoolExecutor и, как и другой код пула потоков, имеет сложную логику создания потоков.

В таких случаях рекомендуется, чтобы MDC.getCopyOfContextMap () вызывался в исходном (главном) потоке перед отправкой задачи исполнителю. Когда задача выполняется в качестве первого действия, она должна вызвать MDC.setContextMapValues ​​(), чтобы связать сохраненную копию исходных значений MDC с новым управляемым потоком Executor.

Это было бы хорошо, но очень легко забыть добавить эти вызовы, и нет простого способа распознать проблему, пока не станет слишком поздно. Единственным признаком Log4j является то, что вы пропускаете информацию MDC в журналах, а с помощью Logback вы получаете устаревшую информацию MDC (поскольку поток в пуле протекторов наследует свой MDC от первой задачи, которая была на нем запущена). Оба являются серьезными проблемами в производственной системе.

Я не вижу в этом особой ситуации, но я не мог найти много информации об этой проблеме в Интернете. Очевидно, что это не то, с чем сталкиваются многие люди, поэтому должен быть способ избежать этого. Что мы здесь делаем не так?

Ответы [ 6 ]

68 голосов
/ 12 октября 2013

Да, это общая проблема, с которой я столкнулся.Есть несколько обходных путей (например, ручная установка, как описано), но в идеале вам нужно решение, которое

  • устанавливает MDC последовательно;
  • Предотвращает неявные ошибки, если MDC неверенно вы этого не знаете;и
  • Минимизирует изменения в том, как вы используете пулы потоков (например, создание подклассов Callable с MyCallable везде или похожим уродством).

Вот решение, которое я использую, которое удовлетворяет этимтри потребности.Код должен быть понятен.

(Примечание: этот исполнитель может быть создан и передан в MoreExecutors.listeningDecorator() Гуавы, если вы используете ListanableFuture. Гуавы).

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}
25 голосов
/ 20 июня 2011

Мы столкнулись с подобной проблемой. Возможно, вы захотите расширить ThreadPoolExecutor и переопределить методы before / afterExecute, чтобы сделать необходимые вызовы MDC перед запуском / остановкой новых потоков.

12 голосов
/ 12 апреля 2018

ИМХО, лучшее решение:

  • использовать ThreadPoolTaskExecutor
  • реализовать свой собственный TaskDecorator
  • использовать его: executor.setTaskDecorator(new LoggingTaskDecorator());

Декоратор может выглядеть так:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}
2 голосов
/ 10 января 2018

Вот как я это делаю с фиксированными пулами потоков и исполнителями:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

В поточной части:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});
2 голосов
/ 12 сентября 2016

Подобно ранее опубликованным решениям, методы newTaskFor для Runnable и Callable могут быть перезаписаны для переноса аргумента (см. Принятое решение) при создании RunnableFuture.

Примечание. Следовательно, метод executorService submit должен вызываться вместо метода execute.

Для ScheduledThreadPoolExecutor методы decorateTask будут перезаписаны.

0 голосов
/ 22 мая 2017

Я смог решить эту проблему, используя следующий подход

В основном потоке (Application.java, точка входа моего приложения)

static public Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

В методе run класса, который вызываетсяИсполнитель

MDC.setContextMap(Application.mdcContextMap);
...