Как перехватить создание и выполнение Runnable под главным потоком, чтобы заполнить данные потока контекста, используя AspectJ - PullRequest
0 голосов
/ 19 сентября 2019

Исходная проблема

Заполнение Java MDC из потока во все его порожденные внутренние потоки (отношение родительских и дочерних элементов)

Решение WIP с использованием AspectJ

Я могу написать аспект, перехватывающий все Runnable создания, но так как я хочу разные экземпляры аспектов для каждого использования (с пользовательской аннотацией), так как я должен где-то хранить MDC при выполнении кодаиз родительского потока я не могу написать pointcut, перехватывающий вновь созданный экземпляр Runnable, поэтому я могу установить MDC, используя предыдущую контекстную карту.

Вот аспект

@Aspect("percflow(@annotation(com.bell.cts.commons.cron.framework.scheduler.domain.MDCTrace))")
public class MDCTraceAspect {

  private final Logger logger = LoggerFactory.getLogger(MDCTraceAspect.class);
  private int i;
  private final Map<String, String> contextMap;

  public MDCTraceAspect() {
    i = new Random().nextInt();
    MDC.clear();
    MDC.put("IP", String.valueOf(i));
    contextMap = MDC.getCopyOfContextMap();
    logger.debug(String.format("[%d] New Aspect", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..))")
  public void beforeNewRunnable(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.debug(String.format("[%d] New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(* Runnable+.*(..))")
  public void before(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] RUNNABLE WORKS!", Thread.currentThread().getId()));
  }

  @Before("execution(void Child.run())")
  public void beforeChildRun(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] CHILD WORKS!", Thread.currentThread().getId()));
  }
}

А вот Parent, Child и пользовательская аннотация

public class Parent {

  private final Logger logger = LoggerFactory.getLogger(Parent.class);
  private ExecutorService executorService;

  @MDCTrace
  public void runMultiThreadByExecutor() throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    logger.info(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(new Child());
    logger.info(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      logger.info(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    logger.info(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    parent.runMultiThreadByExecutor();
  }
}
public class Child implements Runnable {

  private final Logger logger = LoggerFactory.getLogger(Child.class);

  @Override
  public void run() {
    logger.info(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {
}

Цель

Конечная цель - простоаннотировать точку входа контекста MDC, чтобы любые потоки / runnables / futures, созданные во время выполнения аннотированного метода (даже другие объекты), были перехвачены, поэтому MDC правильно установлен с использованием информации MDC об исходном / родительском потоке, хранящейся в aspect экземпляр текущего потока контекста.

Оба предположения before и beforeChildRun не работают, и я не могу найти способ заставить одного из них работать.

Спасибо

Бонусный балл , если кто-то может подсказать мне, как сделать так, чтобы это работало и для parallelStream.

1 Ответ

2 голосов
/ 19 сентября 2019

Прежде всего вам необходимо понять, что новый поток не входит в поток управления своего родительского потока.Смотрите мои другие ответы для объяснения вкл.Пример кода и журнал консоли:

Таким образом, все, что связано с cflow() или созданием аспекта percflow()в этом случае, как вы уже заметили, работать не будет.

Единственный способ получить часть того, что вам нужно - по крайней мере, для ваших собственных классов, если вы используете ткачество во время компиляции, а также для сторонних JAR-файлов/ classes (кроме классов JRE), если вы используете ткачество во время загрузки - это ручная бухгалтерия.

Посмотрите на этот пример, я немного изменил ваш собственный код, чтобы показать обходной путь и его ограничения.Я также хотел избежать использования каких-либо каркасов и вместо этого печатаю в System.outПоэтому мне пришлось заменить MDC на фиктивный класс, чтобы код скомпилировался.

package de.scrum_master.app;

import java.util.HashMap;
import java.util.Map;

public class MDC {
  private static ThreadLocal<Map<String, String>> contextMap = new InheritableThreadLocal<>();

  static {
    clear();
  }

  public static void clear() {
    contextMap.set(new HashMap<>());
  }

  public static void put(String key, String value) {
    contextMap.get().put(key, value);
  }

  public static Map<String, String> getCopyOfContextMap() {
    return new HashMap<>(contextMap.get());
  }

  public static void setContextMap(Map<String, String> contextMap) {
    MDC.contextMap.set(contextMap);
  }
}
package de.scrum_master.app;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {}
package de.scrum_master.app;

public class Child implements Runnable {
  @Override
  public void run() {
    System.out.println(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
package de.scrum_master.app;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Parent {
  private ExecutorService executorService;

  @MDCTrace
  public Runnable runMultiThreadByExecutorAnnotated(Runnable runnable) throws InterruptedException {
    return doStuff(runnable);
  }

  @MDCTrace
  public Runnable runMultiThreadByExecutorAnnotated() throws InterruptedException {
    return doStuff();
  }

  public Runnable runMultiThreadByExecutorPlain() throws InterruptedException {
    return doStuff();
  }

  public Runnable runMultiThreadByExecutorPlain(Runnable runnable) throws InterruptedException {
    return doStuff(runnable);
  }

  private Runnable doStuff() throws InterruptedException {
    return doStuff(new Child());
  }

  private Runnable doStuff(Runnable runnable) throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    System.out.println(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(runnable);
    System.out.println(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      //System.out.println(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    System.out.println(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
    System.out.println("\n----------------------------------------\n");
    return runnable;
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    System.out.println("MDCTrace annotation");
    parent.runMultiThreadByExecutorAnnotated();
    System.out.println("No annotation");
    parent.runMultiThreadByExecutorPlain();

    Runnable runnable = new Child();
    System.out.println("MDCTrace annotation (runnable created outside of control flow)");
    parent.runMultiThreadByExecutorAnnotated(runnable);
    System.out.println("No annotation (re-use runnable created outside of control flow)");
    parent.runMultiThreadByExecutorPlain(runnable);

    System.out.println("MDCTrace annotation (save returned runnable)");
    runnable = parent.runMultiThreadByExecutorAnnotated();
    System.out.println("No annotation (re-use returned runnable)");
    parent.runMultiThreadByExecutorPlain(runnable);
}
}

Как видите, у меня есть положительный и отрицательный тестпример (с аннотацией @MDCTrace и без нее) и три случая для каждого из них:

  1. Создание исполняемых объектов внутри потока управления аннотированного (или неаннотированного) метода, как вы делаете в своем собственном примере.
  2. Создание исполняемых объектов вне потока управления аннотированного (или неаннотированного) метода и передача их по ссылке в поток управления.
  3. Создание первого запускаемого объекта в потоке управления аннотированногометод, возвращая его и передавая его в поток управления неаннотированного метода.

Числа 2 и 3 предназначены для демонстрации ограничений последующего аспектного подхода, который в основном состоит в ведении ручного учетавсе Runnable экземпляры, созданные в потоке управления аннотированного метода.

package de.scrum_master.aspect;

import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

import de.scrum_master.app.MDC;

@Aspect
public class MDCTraceAspect {
  private static final Random RANDOM = new Random(); 
  private Map<String, String> contextMap;
  private Set<Runnable> runnables = new HashSet<>();

  @Pointcut("@annotation(de.scrum_master.app.MDCTrace) && execution(* *(..))")
  private static void entryPoint() {}

  @Before("entryPoint()")
  public void executeEntryPoint() {
    MDC.clear();
    MDC.put("IP", String.valueOf(RANDOM.nextInt()));
    contextMap = MDC.getCopyOfContextMap();
    System.out.println(String.format("[%d] * Entry point", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..)) && cflow(entryPoint()) && target(runnable)")
  public void beforeNewRunnable(JoinPoint joinPoint, Runnable runnable) {
    runnables.add(runnable);
    MDC.setContextMap(contextMap);
    System.out.println(String.format("[%d] * New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(public void Runnable+.run(..)) && target(runnable)")
  public void beforeRunnableExecution(JoinPoint joinPoint, Runnable runnable) {
    if (!runnables.contains(runnable))
      return;
    MDC.setContextMap(contextMap);
    System.out.println(String.format("[%d] * Runnable started", Thread.currentThread().getId()));
  }
}

Это приводит к следующему журналу консоли(разбито на 3 части):


  1. Создание исполняемых объектов в потоке управления аннотированного (или неаннотированного) метода, как вы делаете в своем собственном примере:
MDCTrace annotation
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[12] * Runnable started
[12] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation
[1] Before start child thread
[1] After start child thread
[13] Running in the child thread
[1] ExecutorService is over

----------------------------------------

Это работает так, как вы могли бы ожидать.Здесь нет сюрпризов.


Создание исполняемых объектов вне потока управления аннотированного (или неаннотированного) метода, передавая их по ссылке в поток управления:
MDCTrace annotation (runnable created outside of control flow)
[1] * Entry point
[1] Before start child thread
[1] After start child thread
[14] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation (re-use runnable created outside of control flow)
[1] Before start child thread
[1] After start child thread
[15] Running in the child thread
[1] ExecutorService is over

----------------------------------------

Как видите, здесь не выводится журналпосле того, как точка входа была достигнута.Это не то, что вы могли бы хотеть, но исполняемый файл был создан вне потока управления и передан, так что аспект здесь не запускается.


Создание первого запускаемого объекта в потоке управления аннотированного метода, его возврат и передача в поток управления неаннотированного метода:
MDCTrace annotation (save returned runnable)
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[16] * Runnable started
[16] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation (re-use returned runnable)
[1] Before start child thread
[1] After start child thread
[17] * Runnable started
[17] Running in the child thread
[1] ExecutorService is over

----------------------------------------

Здесь часть A похожа надело №.1, но часть B также печатает строку журнала для неаннотированного метода, поскольку экземпляр Runnable был зарегистрирован в бухгалтерии аспекта во время потока управления аннотированного метода.Итак, здесь вы видите строку журнала, которую вы, вероятно, скорее всего хотите избежать.

Итак, каков вывод здесь?Идеального решения не существует, вам нужно проверить свой код и какие у вас там дела, а затем спроектировать аспект, чтобы учесть эти случаи.Если у вас нет дел, подобных тем, которые я выдумал, нет.2 и 3, мой подход работает.

Некоторые другие замечания:

  • Остерегайтесь разницы Runnable с и Thread с.Они не одинаковы, вы можете повторно использовать один и тот же runnable в нескольких потоках.Кроме того, вы также можете повторно использовать потоки, например, используя пулы потоков.Так что это может быть сколь угодно сложным.Каждый запускаемый поток или поток, помеченный как цель для вашего аспекта, может быть позже использован повторно в контексте, который вы не хотите регистрировать.
  • Для параллельных потоков или других случаев, в которых исполняемые файлы создаются самой JRE, это никогда не будет работать, потому что исполняемые объекты и потоки, созданные внутренними классами JRE, не подлежат переплетению аспектов ни во время компиляции, ни в загрузке.время ткачества.Теоретически вы можете втиснуть код аспекта в JRE или JDK, создавая новые JAR-файлы из тканых классов и заменяя оригиналы или добавляя их в путь к загрузочному классу.Но это немного сложно, и вам действительно нужно контролировать среду выполнения вашего приложения, чтобы запустить JVM с правильными параметрами.Я делал это раньше, и это работает, но это не для начинающих и, вероятно, плохая идея для начала.
...