Spring boot rabbitMq setExceptionHandler - PullRequest
       40

Spring boot rabbitMq setExceptionHandler

0 голосов
/ 16 октября 2018

Как я могу преобразовать следующий код с помощью среды Spring?

ConnectionFactory factory = new ConnectionFactory();
factory.setExceptionHandler(new BrokerExceptionHandler(logger, instance));

public final class BrokerExceptionHandler extends StrictExceptionHandler {
   @Override
   public void handleReturnListenerException(Channel channel, Throwable exception) {
        logger.log(Level.SEVERE, "ReturnListenerException detected: ReturnListener.handleReturn", exception);
        this.publishAlert(exception, "ReturnListener.handleReturn");
        logger.log(Level.SEVERE, "Close application", exception);
        System.exit(-1);
   }
  ....
}

В основном мне нужно указать пользовательский обработчик исключений, если возникает исключение rabbitMQ, а затем остановить приложение

Какя могу публиковать сообщение rabbitMq каждый раз, когда возникает исключение?

РЕДАКТИРОВАТЬ

Я изменил свой класс конфигурации следующим образом:

@Bean
SimpleMessageListenerContainer containerPredict(ConnectionFactory connectionFactory,
  MessageListenerAdapter listenerPredictAdapter) {
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
     container.setConnectionFactory(connectionFactory);
     container.setDefaultRequeueRejected(false);
     container.setErrorHandler(new BrokerExceptionHandler());
     container.setQueueNames(getQueueName());
     container.setMessageListener(listenerAdapter);
     return container;
}

а это мой класс BrokerExceptionHandler

public class BrokerExceptionHandler implements ErrorHandler {
   private final Logger logger = Logger.getLogger(getClass().getSimpleName());

   @Autowired
   private Helper helper;

   @Override
   public void handleError(Throwable t) {
     logger.log(Level.SEVERE, "Exception Detected. Publishing error alert");
     String message = "Exception detected. Message: " + t.getMessage());

     // Notify the error to the System sending a new RabbitMq message
     System.out.println("---> Before convertAndSend");
     rabbitTemplate.convertAndSend(exchange, routing, message);
     System.out.println("---> After convertAndSend");
   }
}

Я вижу журнал Exception Detected. Publishing error alert и ---> Before convertAdnSend в консоли, но новое предупреждение не публикуется, а журнал ---> After convertAndSend не отображается вконсоль.

Вот это журнал:

2018-10-17 09: 32: 02.849 ОШИБКА 1506 --- [tainer **** - 1] BrokerExceptionHandler:Обнаружено исключение.Оповещение об ошибке публикации

---> Перед convertAndSend

2018-10-17 09: 32: 02.853 INFO 1506 --- [tainer **** - 1] osarlSimpleMessageListenerContainer: перезапуск Consumer @4f5b08d: tags = [{amq.ctag-yUcUmg5BCo20ucG1wJZoWA = myechange}], channel = Кэшированный канал кролика: AMQChannel (amqp: //admin@XXX.XXX.XXX.XXX: 5672 / testbed_simulators, 1), cond: 39Соединение с общим кроликом: SimpleConnection @ 61f39bb [делегат = amqp: //admin@XXX.XXX.XXX.XXX: 5672 / testbed_simulators, localPort = 51528] ,cknowledgeMode = AUTO, размер локальной очереди = 0

2018-10-17 09: 32: 02.905 ИНФОРМАЦИЯ 1506 --- [tainer **** - 2] osamqp.rabbit.core.RabbitAdmin: автоматическое объявление недолговременного, автоматического удаления или эксклюзивной очереди (myexchange) длительного: false, автоудаление: правда, эксклюзив: правда.Он будет повторно объявлен, если посредник остановится и будет перезапущен, пока фабрика соединений активна, но все сообщения будут потеряны.

2018-10-17 09: 32: 02.905 INFO 1506 --- [tainer **** - 2] osamqp.rabbit.core.RabbitAdmin: автоматическое объявление долговременной, автоматически удаляемой или исключительной очереди (myexchange) длительной: false, автоудаление: true, exclusive: true.Он будет объявлен повторно, если посредник остановится и будет перезапущен, пока фабрика соединений активна, но все сообщения будут потеряны.

РЕДАКТИРОВАТЬ

Отладка Я вижуперед отправкой нового сообщения вызывается следующий код:

File: SimpleMessageListenerContainer.class line 1212

if (!isActive(this.consumer) || aborted) {
  .....
}
else {
  ---> logger.info("Restarting " + this.consumer);
       restart(this.consumer);
}

РЕДАКТИРОВАТЬ 2

Пример кода: http://github.com/fabry00/spring-boot-rabbitmq

1 Ответ

0 голосов
/ 16 октября 2018

Это зависит от того, как вы делаете свою конфигурацию;если вы используете автоматически сконфигурированную фабрику соединений Spring Boot ...

@Bean
public InitializingBean connectionFactoryConfigurer(CachingConnectionFactory ccf) {
    return () -> ccf.getRabbitConnectionFactory().setExceptionHandler(...);
}

Если вы подключаете собственные бины (например, через RabbitConnectionFactoryBean), то установите их напрямую.

EDIT

Вы добавляете NullPointerException в свой обработчик ошибок ...

2018-10-17 11:51:58.733 DEBUG 38975 --- [containerKpis-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

java.lang.NullPointerException: null
    at com.test.BrokerExceptionHandler.handleError(BrokerExceptionHandler.java:27) ~[main/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1243) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1488) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1318) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

2018-10-17 11:51:58.734  INFO 38975 --- [containerKpis-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@1aabf50d: tags=[{amq.ctag-VxxHKiMsWI_w8DIooAsySA=myapp.mydomain.KPIS}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@b88a7d6 Shared Rabbit Connection: SimpleConnection@25dc64a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55662], acknowledgeMode=AUTO local queue size=0

Чтобы включить ведение журнала DEBUG, добавьте

logging.level.org.springframework.amqp=debug

to application.properties.

this.helper равно null, поскольку обработчик ошибок не является Spring Bean - @Autowired работает только в том случае, если Spring управляет объектом;вы используете new BrokerExceptionHandler().

EDIT2

Я добавил эти 2 боба

@Bean
public BrokerExceptionHandler errorHandler() {
    return new BrokerExceptionHandler();
}

@Bean
public MessageConverter json() { // Boot auto-configures in template
    return new Jackson2JsonMessageConverter();
}

и теперь ...

---> Before publishing Alert event
--- ALERT
2018-10-17 12:14:45.304  INFO 43359 --- [containerKpis-1] Helper                                   : publishAlert
2018-10-17 12:14:45.321 DEBUG 43359 --- [containerKpis-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2018-10-17 12:14:45.321 DEBUG 43359 --- [containerKpis-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$638/975724213 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@77f3f419 Shared Rabbit Connection: SimpleConnection@10c86af1 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56220]
2018-10-17 12:14:45.321 DEBUG 43359 --- [containerKpis-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'{"timestamp":1539792885303,"code":"ERROR","severity":"ERROR","message":"Exception detected. Message: Listener method 'kpisEvent' threw exception"}' MessageProperties [headers={sender=myapp, protocolVersion=1.0.0, senderType=MY_COMPONENT_1, __TypeId__=com.test.domain.Alert, timestamp=1539792885304}, contentType=application/json, contentEncoding=UTF-8, contentLength=146, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [myevent.ALERT], routingKey = [/]
--- ALERT 2
---> After publishing Alert event
2018-10-17 12:14:45.323 DEBUG 43359 --- [pool-1-thread-6] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for consumerTag: 'amq.ctag-eYbzZ09pCw3cjdtSprlZMQ' with deliveryTag: '1' in Consumer@4b790d86: tags=[{amq.ctag-eYbzZ09pCw3cjdtSprlZMQ=myapp.myevent.ALERT}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@77f3f419 Shared Rabbit Connection: SimpleConnection@10c86af1 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56220], acknowledgeMode=AUTO local queue size=0
2018-10-17 12:14:45.324 DEBUG 43359 --- [ontainerReset-1] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'{"timestamp":1539792885303,"code":"ERROR","severity":"ERROR","message":"Exception detected. Message: Listener method 'kpisEvent' threw exception"}' MessageProperties [headers={sender=myapp, protocolVersion=1.0.0, senderType=MY_COMPONENT_1, __TypeId__=com.test.domain.Alert, timestamp=1539792885304}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myevent.ALERT, receivedRoutingKey=/, deliveryTag=1, consumerTag=amq.ctag-eYbzZ09pCw3cjdtSprlZMQ, consumerQueue=myapp.myevent.ALERT])
2018-10-17 12:14:45.324  INFO 43359 --- [ontainerReset-1] Application                              : ---> kpisAlert RECEIVED
2018-10-17 12:14:45.325 ERROR 43359 --- [ontainerReset-1] Application                              : ---> Message: Exception detected. Message: Listener method 'kpisEvent' threw exception
2018-10-17 12:14:45.326 DEBUG 43359 --- [containerKpis-1] o.s.a.r.listener.BlockingQueueConsumer   : Rejecting messages (requeue=false)

EDIT3

Или, если вы предпочитаете Gson ...

  @Bean
  public MessageConverter json() {
    Gson gson = new GsonBuilder().create();
    return new MessageConverter() {

      @Override
      public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(gson.toJson(object).getBytes(), messageProperties);
      }

      @Override
      public Object fromMessage(Message message) throws MessageConversionException {
        throw new UnsupportedOperationException();
      }

    };
  }

EDIT4

Я изменилтекущая версия вашего приложения выглядит следующим образом:

  @Bean
  public MessageConverter jsonConverter() {
    Gson gson = new GsonBuilder().create();
    EventKpisCollected collected = new EventKpisCollected();
    return new MessageConverter() {

      @Override
      public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        System.out.println("toMessage");
        return new Message(gson.toJson(object).getBytes(), messageProperties);
      }

      @Override
      public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("fromMessage");
        return collected.decode(new String(message.getBody()));
      }
    };
  }

...

  @Bean
  SimpleMessageListenerContainer containerKpis(ConnectionFactory connectionFactory,
      MessageListenerAdapter listenerKpisAdapter) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setDefaultRequeueRejected(false);
    container.setErrorHandler(errorHandler());
    container.setQueueNames(getQueueKpis());
    container.setMessageListener(listenerKpisAdapter);
    return container;
  }

  @Bean
  SimpleMessageListenerContainer containerReset(ConnectionFactory connectionFactory,
      MessageListenerAdapter listenerAlertAdapter) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setDefaultRequeueRejected(false);
    container.setErrorHandler(errorHandler());
    container.setQueueNames(getQueueAlert());
    container.setMessageListener(listenerAlertAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerKpisAdapter(Application receiver) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "kpisEvent");
    messageListenerAdapter.setMessageConverter(jsonConverter());
    return messageListenerAdapter;
  }

  @Bean
  MessageListenerAdapter listenerAlertAdapter(Application receiver) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "alertEvent");
//    messageListenerAdapter.setMessageConverter(jsonConverter()); converter only handles events.
    return messageListenerAdapter;
  }

и

fromMessage
2018-10-19 13:46:53.734  INFO 10725 --- [containerKpis-1] Application                              : ---> kpisEvent RECEIVED
2018-10-19 13:46:53.734  INFO 10725 --- [containerKpis-1] Application                              : ---> kpisEvent DECODED, windowId: 1522751098000-1522752198000

С декодированием события, выполненным платформой (только для события в настоящее время - вам потребуется второй конвертер дляАлерс).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...