Как перебить сообщения RabbitMQ, когда в сервисе после исключения агрегатора происходит исключение - PullRequest
0 голосов
/ 03 апреля 2019

Я пытаюсь найти лучший способ обработки ошибок, которые могли возникнуть в службе, которая вызывается после истечения времени ожидания группы агрегата, которая имитирует тот же поток, как если бы было выполнено выражение releaseExpression.

Здесьмои настройки:

У меня есть AmqpInboundChannelAdapter, который принимает сообщения и отправляет их моему агрегатору.

Когда будет выполнено выражение releaseExpression и до истечения срока действия groupTimeout, если будет выдано исключениемой ServiceActivator, сообщения отправляются в мою очередь недоставленных сообщений для всех сообщений в этой группе сообщений.(10 сообщений в моем примере ниже, который используется только в иллюстративных целях) Это то, что я ожидал.

Если мое releaseExpression не было выполнено, но groupTimeout было выполнено, и время ожидания группы истекло, если исключение выдается в моем ServiceActivator, то сообщения не отправляются в мою очередь недоставленных сообщений и проверяются,

После прочтения другого поста в блоге link1 упоминается, что это происходит, потому что обработка происходит в другом потоке с помощью MessageGroupStoreReaper, а не в том, в котором был SimpleMessageListenerContainer.Как только обработка уходит из потока SimpleMessageListener, сообщения будут автоматически подтверждены.

Я добавил конфигурацию, упомянутую в ссылке выше, и вижу сообщения об ошибках, отправляемые в мой обработчик ошибок.Мой главный вопрос: что считается лучшим способом справиться с этим сценарием, чтобы минимизировать потерю сообщений.

Вот варианты, которые я изучал:

  • Использование BatchRabbitTemplateв моем настраиваемом обработчике ошибок опубликовать сообщения об ошибках в той же очереди недоставленных сообщений, в которую они бы отправились, если бы было выполнено выражение releaseExpression.(Это подход, который я изложил ниже, но я беспокоюсь о том, что сообщения могут быть потеряны, если во время публикации произойдет ошибка)

  • Исследуйте, если нет, я мог бы сообщить SimpleMessageListener об ошибкечто произошло, и он отправил пакет сообщений, которые не удалось в очередь мертвых писем?Я сомневаюсь, что это возможно, так как кажется, что сообщения уже подтверждены.

  • Не устанавливайте для SimpleMessageListenerContainer значение AcknowledgeMode.AUTO и вручную проверяйте сообщения, когда они обрабатываются с помощью Сервиса, когда встречается releaseExpression или groupTimeOut.(Это выглядит немного грязно, поскольку в MessageGroup может быть сообщение 1..N, но хотелось посмотреть, что сделали другие)

В идеале я хочу иметь поток, который будетэто будет имитировать тот же поток, когда будет выполнено выражение releaseExpression, чтобы сообщения не терялись.

Есть ли у кого-нибудь рекомендации о том, как лучше всего справиться с этим сценарием, который они использовали в прошлом?

Спасибо за любую помощь и / или совет!

Вот мой текущийконфигурирование с использованием Spring Integration DSL

@Bean
    public SimpleMessageListenerContainer workListenerContainer() {
        SimpleMessageListenerContainer container =
                new SimpleMessageListenerContainer(rabbitConnectionFactory);
        container.setQueues(worksQueue());
        container.setConcurrentConsumers(4);
        container.setDefaultRequeueRejected(false);
        container.setTransactionManager(transactionManager);
        container.setChannelTransacted(true);
        container.setTxSize(10);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);          
        return container;
    }

  @Bean
    public AmqpInboundChannelAdapter inboundRabbitMessages() {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());       
        return adapter;
    }

Я определил канал ошибок и определил свой собственный taskScheduler для использования в MessageStoreRepear

   @Bean 
    public ThreadPoolTaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler  ts = new ThreadPoolTaskScheduler();
        MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
        mpe.setDefaultErrorChannel(myErrorChannel());
        ts.setErrorHandler(mpe);
        return ts;
    }


    @Bean
    public PollableChannel myErrorChannel() {
        return new QueueChannel();
    }
 public IntegrationFlow aggregationFlow() {
        return IntegrationFlows.from(inboundRabbitMessages())               
                .transform(Transformers.fromJson(SomeObject.class))             
                 .aggregate(a->{
                    a.sendPartialResultOnExpiry(true);                  
                    a.groupTimeout(3000);   
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);                    
                    a.correlationExpression("T(Thread).currentThread().id");
                    a.releaseExpression("size() == 10");                            
                    a.transactional(true);
                 }
                )               
                .handle("someService", "processMessages")
                .get();
    }

Вот мой пользовательский поток ошибок

@Bean
    public IntegrationFlow errorResponse() {
        return IntegrationFlows.from("myErrorChannel")
                    .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
                            e -> e.poller(p -> p.fixedDelay(100)))
                    .channel("myErrorChannelHandler")
                    .handle("myErrorHandler","handleFailedMessage")
                    .log()
                    .get();
    }

Вот пользовательский обработчик ошибок

@Component
public class MyErrorHandler {

    @Autowired
    BatchingRabbitTemplate batchingRabbitTemplate;

    @ServiceActivator(inputChannel = "myErrorChannelHandler")
    public void handleFailedMessage(Message<?> message) {       
        ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();        
        payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
    }

}

Вот bean-компонент BatchingRabbitTemplate

    @Bean   
    public BatchingRabbitTemplate batchingRabbitTemplate() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(5);
        scheduler.initialize();
        BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
        BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);    
        batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
        return batchingRabbitTemplate;
    }

Обновление 1 ) для отображения пользовательского MessageGroupProcessor:

public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    @Override
    protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
        return group;
    }
}

Пример службы:

@Slf4j
public class SomeService  {
    @ServiceActivator
    public void processMessages(MessageGroup messageGroup) throws IOException {
        Collection<Message<?>> messages  = messageGroup.getMessages();
        //Do business logic 
        //ack messages in the group
        for (Message<?> m : messages) {
            com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel) 
                    m.getHeaders().get("amqp_channel");
            long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
            log.debug(" deliveryTag = {}",deliveryTag);
            log.debug("Channel = {}",channel);
            channel.basicAck(deliveryTag, false);
        }
    }
}

Обновлен интеграционный поток

public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
        return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
                .aggregate(a -> {
                    a.sendPartialResultOnExpiry(true);
                    a.groupTimeout(3000);
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);
                    a.correlationExpression("T(Thread).currentThread().id");
                    a.releaseExpression("size() == 10");
                    a.transactional(true);
                    a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
                }).handle("someService", "processMessages").get();
    }

Новый ErrorHandler для nack

public class MyErrorHandler {

    @ServiceActivator(inputChannel = "myErrorChannelHandler")
    public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
        if(messageGroup!=null) {
            log.debug("Nack messages size = {}", messageGroup.getMessages().size());
            Collection<Message<?>> messages  = messageGroup.getMessages();
            for (Message<?> m : messages) {
                com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel) 
                        m.getHeaders().get("amqp_channel");
                long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");           
                log.debug("deliveryTag = {}",deliveryTag);
                log.debug("channel = {}",channel);
                channel.basicNack(deliveryTag, false, false);
            }       
        }
    }
}

Обновление2 Добавлены пользовательские ReleaseStratgedy и изменено на aggegator

public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {

    private static final int MAX_MESSAGE_COUNT = 10;

    public boolean canRelease(MessageGroup messageGroup) {
        return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
    }
}
   public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
        return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
                .aggregate(a -> {
                    a.sendPartialResultOnExpiry(true);
                    a.groupTimeout(3000);
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);
                    a.correlationExpression("T(Thread).currentThread().id");                   
                    a.transactional(true);
                    a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());            
                    a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
                }).handle("someService", "processMessages").get();
    }

1 Ответ

0 голосов
/ 03 апреля 2019

В вашем понимании есть некоторые недостатки. Если вы используете AUTO, только последнее сообщение будет помечено буквами при возникновении исключения.Сообщения, успешно переданные в группу перед выпуском, будут немедленно подтверждены.

Единственный способ добиться того, чего вы хотите, - это использовать РУЧНЫЕ подтверждения (

). "скажите контейнеру слушателя отправлять сообщения в DLQ ".Контейнер никогда не отправляет сообщения в DLQ, он отклоняет сообщение, и посредник отправляет его в DLX / DLQ.

...