Spring Sleuth - неработающая трассировка на JMS ErrorHandler - PullRequest
1 голос
/ 05 августа 2020

У меня есть простой пример https://github.com/gtiwari333/sleuth-jms-broken-tracing/tree/master, который использует Spring Sleuth с JMS.

Здесь вызов /jms конечной точки ставит сообщение в очередь и при получении сообщения на onMessage, мы выполняем GET-вызов /test и бросаем MyException. Мы ожидаем, что идентификатор трассировки будет передан в ErrorHandler, поэтому мы увидим тот же идентификатор трассировки в журнале между конечными точками /jms, onMessage(), handleError() и /test.

Что я получаю сейчас / Как получить ошибку:

Я запустил приложение и попал в конечную точку localhost:8080/jms. В журнале ниже TraceId не распространяется в классе JmsListenerErrorHandler, а новый TraceId, созданный для вызова GET, на /test

2020-08-04 17:55:24.212  INFO [,225c47fb814f6584,225c47fb814f6584,true] 16956 --- [nio-8080-exec-1] sleuth.SleuthApplication                 : Queuing message ...
2020-08-04 17:55:24.282  INFO [,225c47fb814f6584,eac851f1650ae8a6,true] 16956 --- [enerContainer-1] sleuth.SleuthApplication                 : JMS message received SOME MESSAGE !!!
2020-08-04 17:55:24.321  INFO [,225c47fb814f6584,612a7956f6b29a01,true] 16956 --- [nio-8080-exec-3] sleuth.SleuthApplication                 : test1 called  
<<<<<<<<< FINE UPTO HERE
2020-08-04 17:55:24.332  INFO [,,,] 16956 --- [enerContainer-1] sleuth.SleuthApplication                 : handling error by calling another endpoint ..     
<<<<<<<<< new thread started and lost tracing
2020-08-04 17:55:24.336  INFO [,4c163d0997076729,4c163d0997076729,true] 16956 --- [nio-8080-exec-2] sleuth.SleuthApplication                 : test1 called  
<<<<<<<<< new trace id received

Похоже, JMS обрабатывает получение / обработку новых сообщений в новая ветка. У Sleuth есть необходимый 'инструментальный' logi c для перехвата и распространения идентификаторов Trace / Span на код @JmsListener, но он не распространяется на org.springframework.util.ErrorHandler.

  • org.springframework. jms.listener.DefaultMessageListenerContainer.AsyncMessageListenerInvoker
  • org.springframework.jms.listener.AbstractPollingMessageListenerContainer # doReceiveAndExecute

Код RestController и @JmsListener:

@RestController
static class Ctrl {

    @Autowired RestTemplate restTemplate;
    @Autowired JmsTemplate jmsTemplate;

    @GetMapping("/test")
    void test() {
        log.info("test1 called");
    }

    @GetMapping("/jms")
    void jms() {
        log.info("Queuing message ...");
        jmsTemplate.convertAndSend("test-queue", "SOME MESSAGE !!!");
    }

    @JmsListener(destination = "test-queue", concurrency = "5")
    void onMessage(TextMessage message) throws JMSException {
        log.info("JMS message received {}", message.getText());
        restTemplate.getForEntity("http://localhost:8080/test", Void.class); //-->it works
        throw new MyException("Some Error");  //-->it doesn't
    }
    static class MyException extends RuntimeException {
        public MyException(String msg) { super(msg); }
    }
}

Обработчик ошибок:

@Component
static class JmsListenerErrorHandler implements ErrorHandler {

    @Autowired RestTemplate restTemplate;

    @Override
    public void handleError(Throwable t) {
        log.info("handling error by calling another endpoint .."); //1....tracing is lost here
        restTemplate.getForEntity("http://localhost:8080/test", Void.class);
    }
}

Конфигурация JMS:

@Configuration
@EnableJms
static class ActiveMqConfig implements JmsListenerConfigurer {

    @Autowired ErrorHandler jmsListenerErrorHandler;

    @Autowired ConnectionFactory connectionFactory;

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setContainerFactory(containerFactory());
    }

    @Bean
    JmsListenerContainerFactory<?> containerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(jmsListenerErrorHandler);
        return factory;
    }
}

Что я пробовал: (чтобы сделать это полный вопрос SO)

Его в PR: https://github.com/gtiwari333/sleuth-jms-broken-tracing/pull/1/files Здесь я попытался использовать создать собственный bean-компонент Executor, обернутый LazyTraceThreadPoolTaskExecutor, и попытался передать его to JmsListenerContainerFactory

Он работает для обычного выполнения потока, но не для материала JMS.

executor.execute(() -> log.info("Im inside thread 2")); //it works

Кто-то уже понял, как перехватить ErrorHandler, чтобы передать TraceId?

1 Ответ

2 голосов
/ 05 августа 2020

Имеется открытый вопрос , касающийся инструментария @JmsListener. Поэтому я полагаю, что на данный момент не поддерживается.

Возможное решение - передать Span в исключении:

@RestController
static class Ctrl {
    @Autowired
    private Tracer tracer;
    // ...
    @JmsListener(destination = "test-queue", concurrency = "5")
    void onMessage(TextMessage message) throws JMSException{
        //..
        throw new MyException("Some Error",tracer.currentSpan()); // <-- pass current span
    }
}

Таким образом, вы можете получить его в JmsListenerErrorHandler:

@Override
public void handleError(Throwable t) {
    if(t.getCause() instanceof MyException){
        MyException mEx = (MyException) t.getCause();
        log.info("Failing span: {}",mEx.getSpan());
    }
    //...
}

MyException класс:

class MyException extends RuntimeException {
    private final Span span;
    public MyException(String msg, Span span) {
        super(msg);
        this.span=span;
    }
    // Getter for the Span
}
...