Я пытаюсь найти лучший способ обработки ошибок, которые могли возникнуть в службе, которая вызывается после истечения времени ожидания группы агрегата, которая имитирует тот же поток, как если бы было выполнено выражение releaseExpression.
Здесьмои настройки:
У меня есть AmqpInboundChannelAdapter, который принимает сообщения и отправляет их моему агрегатору.
Когда будет выполнено выражение releaseExpression и до истечения срока действия groupTimeout, если будет выдано исключениемой ServiceActivator, сообщения отправляются в мою очередь недоставленных сообщений для всех сообщений в этой группе сообщений.(10 сообщений в моем примере ниже, который используется только в иллюстративных целях) Это то, что я ожидал.
Если мое releaseExpression не было выполнено, но groupTimeout было выполнено, и время ожидания группы истекло, если исключение выдается в моем ServiceActivator, то сообщения не отправляются в мою очередь недоставленных сообщений и проверяются,
После прочтения другого поста в блоге link1 упоминается, что это происходит, потому что обработка происходит в другом потоке с помощью MessageGroupStoreReaper, а не в том, в котором был SimpleMessageListenerContainer.Как только обработка уходит из потока SimpleMessageListener, сообщения будут автоматически подтверждены.
Я добавил конфигурацию, упомянутую в ссылке выше, и вижу сообщения об ошибках, отправляемые в мой обработчик ошибок.Мой главный вопрос: что считается лучшим способом справиться с этим сценарием, чтобы минимизировать потерю сообщений.
Вот варианты, которые я изучал:
Использование BatchRabbitTemplateв моем настраиваемом обработчике ошибок опубликовать сообщения об ошибках в той же очереди недоставленных сообщений, в которую они бы отправились, если бы было выполнено выражение releaseExpression.(Это подход, который я изложил ниже, но я беспокоюсь о том, что сообщения могут быть потеряны, если во время публикации произойдет ошибка)
Исследуйте, если нет, я мог бы сообщить SimpleMessageListener об ошибкечто произошло, и он отправил пакет сообщений, которые не удалось в очередь мертвых писем?Я сомневаюсь, что это возможно, так как кажется, что сообщения уже подтверждены.
Не устанавливайте для SimpleMessageListenerContainer значение AcknowledgeMode.AUTO и вручную проверяйте сообщения, когда они обрабатываются с помощью Сервиса, когда встречается releaseExpression или groupTimeOut.(Это выглядит немного грязно, поскольку в MessageGroup может быть сообщение 1..N, но хотелось посмотреть, что сделали другие)
В идеале я хочу иметь поток, который будетэто будет имитировать тот же поток, когда будет выполнено выражение releaseExpression, чтобы сообщения не терялись.
Есть ли у кого-нибудь рекомендации о том, как лучше всего справиться с этим сценарием, который они использовали в прошлом?
Спасибо за любую помощь и / или совет!
Вот мой текущийконфигурирование с использованием Spring Integration DSL
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(4);
container.setDefaultRequeueRejected(false);
container.setTransactionManager(transactionManager);
container.setChannelTransacted(true);
container.setTxSize(10);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
@Bean
public AmqpInboundChannelAdapter inboundRabbitMessages() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());
return adapter;
}
Я определил канал ошибок и определил свой собственный taskScheduler для использования в MessageStoreRepear
@Bean
public ThreadPoolTaskScheduler taskScheduler(){
ThreadPoolTaskScheduler ts = new ThreadPoolTaskScheduler();
MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
mpe.setDefaultErrorChannel(myErrorChannel());
ts.setErrorHandler(mpe);
return ts;
}
@Bean
public PollableChannel myErrorChannel() {
return new QueueChannel();
}
public IntegrationFlow aggregationFlow() {
return IntegrationFlows.from(inboundRabbitMessages())
.transform(Transformers.fromJson(SomeObject.class))
.aggregate(a->{
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
}
)
.handle("someService", "processMessages")
.get();
}
Вот мой пользовательский поток ошибок
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from("myErrorChannel")
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.channel("myErrorChannelHandler")
.handle("myErrorHandler","handleFailedMessage")
.log()
.get();
}
Вот пользовательский обработчик ошибок
@Component
public class MyErrorHandler {
@Autowired
BatchingRabbitTemplate batchingRabbitTemplate;
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(Message<?> message) {
ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();
payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
}
}
Вот bean-компонент BatchingRabbitTemplate
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.initialize();
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);
batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
return batchingRabbitTemplate;
}
Обновление 1 ) для отображения пользовательского MessageGroupProcessor:
public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return group;
}
}
Пример службы:
@Slf4j
public class SomeService {
@ServiceActivator
public void processMessages(MessageGroup messageGroup) throws IOException {
Collection<Message<?>> messages = messageGroup.getMessages();
//Do business logic
//ack messages in the group
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug(" deliveryTag = {}",deliveryTag);
log.debug("Channel = {}",channel);
channel.basicAck(deliveryTag, false);
}
}
}
Обновлен интеграционный поток
public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
Новый ErrorHandler для nack
public class MyErrorHandler {
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
if(messageGroup!=null) {
log.debug("Nack messages size = {}", messageGroup.getMessages().size());
Collection<Message<?>> messages = messageGroup.getMessages();
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug("deliveryTag = {}",deliveryTag);
log.debug("channel = {}",channel);
channel.basicNack(deliveryTag, false, false);
}
}
}
}
Обновление2 Добавлены пользовательские ReleaseStratgedy и изменено на aggegator
public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {
private static final int MAX_MESSAGE_COUNT = 10;
public boolean canRelease(MessageGroup messageGroup) {
return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
}
}
public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.transactional(true);
a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}