Это моя текущая настройка:
queue1 и queue2 помечены вместе с интеграционным потоком на канал1:
@Bean
public IntegrationFlow q1f() {
return IntegrationFlows
.from(queue1InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
@Bean
public IntegrationFlow q2f() {
return IntegrationFlows
.from(queue2InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
затем все агрегируется и затем подтверждается после того, как агрегированное сообщение подтверждаетсяrabbitmq:
@Bean
public IntegrationFlow aggregatingFlow() {
return IntegrationFlows
.from(amqpInputChannel())
.aggregate(...
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10))
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
)
.handle(amqpOutboundEndpoint())
.get();
}
@Bean
public AmqpOutboundEndpoint amqpOutboundEndpoint() {
AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
outboundEndpoint.setConfirmAckChannel(manualAckChannel());
outboundEndpoint.setConfirmCorrelationExpressionString("#root");
outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
return outboundEndpoint;
}
ackTemplate()
устанавливается с cf, который имеет springFactory.setPublisherConfirms(true);
.
Проблема, которую я вижу, состоит в том, что раз в 10 дней некоторые сообщения застряли в unacknowledged
состояние в rabbitmq.
Я предполагаю, что каким-то образом публикация сообщения ждет кролика, чтобы он сделал PUBLISHER CONFIRMS
, но он никогда не получает его и время ожидания?В этом случае я никогда не получаю сообщение ACK в queue1
.Возможно ли это?
Итак, еще один полный рабочий процесс:
[две очереди -> прямой канал -> агрегатор (сохраняет значения канала и тега) -> публикация для кролика -> возврат кроликаACK через издателя подтверждает -> spring подтверждает все сообщения на канале + значения, которые он хранит в памяти для агрегированного сообщения]
У меня также есть моя реализация агрегатора (так как мне нужно вручную подтверждать сообщения из q1 и q2):
public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
private AckingState ackingState;
public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
this.ackingState = ackingState;
}
@Override
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
List<ManualAckPair> manualAckPairs = new ArrayList<>();
group.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
});
aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
return aggregatedHeaders;
}
}
ОБНОВЛЕНИЕ
Так выглядит администратор кролика (2 неиспользованных сообщения в течение длительного времени, и они не будут приняты до перезапуска - когда это произойдетдоставлено): ![enter image description here](https://i.stack.imgur.com/cBblm.png)