Весенняя интеграция - издатель подтверждает время ожидания? - PullRequest
0 голосов
/ 15 февраля 2019

Это моя текущая настройка:

queue1 и queue2 помечены вместе с интеграционным потоком на канал1:

@Bean
public IntegrationFlow q1f() {
    return IntegrationFlows
            .from(queue1InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

@Bean
public IntegrationFlow q2f() {
    return IntegrationFlows
            .from(queue2InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

затем все агрегируется и затем подтверждается после того, как агрегированное сообщение подтверждаетсяrabbitmq:

@Bean
    public IntegrationFlow aggregatingFlow() {
        return IntegrationFlows
                .from(amqpInputChannel())
                .aggregate(...
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
                )
                .handle(amqpOutboundEndpoint())
                .get();
    }

    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint() {
        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
        outboundEndpoint.setConfirmAckChannel(manualAckChannel());
        outboundEndpoint.setConfirmCorrelationExpressionString("#root");
        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
        return outboundEndpoint;
    }

ackTemplate() устанавливается с cf, который имеет springFactory.setPublisherConfirms(true);.

Проблема, которую я вижу, состоит в том, что раз в 10 дней некоторые сообщения застряли в unacknowledged состояние в rabbitmq.

Я предполагаю, что каким-то образом публикация сообщения ждет кролика, чтобы он сделал PUBLISHER CONFIRMS, но он никогда не получает его и время ожидания?В этом случае я никогда не получаю сообщение ACK в queue1.Возможно ли это?

Итак, еще один полный рабочий процесс:

[две очереди -> прямой канал -> агрегатор (сохраняет значения канала и тега) -> публикация для кролика -> возврат кроликаACK через издателя подтверждает -> spring подтверждает все сообщения на канале + значения, которые он хранит в памяти для агрегированного сообщения]

У меня также есть моя реализация агрегатора (так как мне нужно вручную подтверждать сообщения из q1 и q2):

public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
    private AckingState ackingState;

    public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
        this.ackingState = ackingState;
    }

    @Override
    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
        List<ManualAckPair> manualAckPairs = new ArrayList<>();
        group.getMessages().forEach(m -> {
            Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
        });
        aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
        return aggregatedHeaders;
    }
}

ОБНОВЛЕНИЕ

Так выглядит администратор кролика (2 неиспользованных сообщения в течение длительного времени, и они не будут приняты до перезапуска - когда это произойдетдоставлено): enter image description here

1 Ответ

0 голосов
/ 21 февраля 2019

В Spring AMQP версии 2.1 (Spring Integration 5.1) мы добавили Future<?> и вернули сообщение в CorrelationData, чтобы помочь с такими вещами.Если вы используете более старую версию, вы можете создать подкласс CorrelationData (и вам придется обрабатывать установку будущего и возвращаемого сообщения в вашем коде).

Это, вместе с запланированной задачей, может обнаружить пропущенноеacks ...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            SuccessCallback<? super Confirm> successCallback = confirm -> {
                System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");
            };
            FailureCallback failureCallback = throwable -> {
                System.out.println(throwable.getMessage());
            };

            // Good - ack
            CorrelationData correlationData = new CorrelationData("good");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "foo", "data", correlationData);

            // Missing exchange nack, no return
            correlationData = new CorrelationData("missing exchange");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("missing exchange", "foo", "data", correlationData);

            // Missing queue ack, with return
            correlationData = new CorrelationData("missing queue");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "missing queue", "data", correlationData);
        };
    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());
            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}

.

Checking pending acks
Ack received OK for good
Nack received for missing exchange
Message returned for missing queue
No pending acks, exiting

В Spring Integration есть confirmCorrelationExpression, который можно использовать для создания экземпляра CorrelationData.

РЕДАКТИРОВАТЬ

С пружинной интеграцией ...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    public interface Gate {

        void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);

    }

    @Bean
    @DependsOn("flow")
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            gate.send("", "foo", "good");
            gate.send("junque", "rk", "missing exchange");
            gate.send("", "junque", "missing queue");
        };
    }

    @Bean
    public IntegrationFlow flow(RabbitTemplate template) {
        return IntegrationFlows.from(Gate.class)
                    .handle(Amqp.outboundAdapter(template)
                            .confirmCorrelationExpression("@correlationCreator.create(#root)")
                            .exchangeNameExpression("headers.exchange")
                            .routingKeyExpression("headers.rk")
                            .returnChannel(returns())
                            .confirmAckChannel(acks())
                            .confirmNackChannel(acks()))
                    .get();
    }

    @Bean
    public MessageChannel acks() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel returns() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow ackFlow() {
        return IntegrationFlows.from("acks")
                /*
                 * Work around a bug because the correlation data is wrapped and so the
                 * wrong future is completed.
                 */
                .handle(m -> {
                    System.out.println(m);
                    if (m instanceof ErrorMessage) { // NACK
                        NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();
                        CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();
                        correlationData.getFuture().set(new Confirm(false, "Message was returned"));
                    }
                    else {
                        ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow retFlow() {
        return IntegrationFlows.from("returns")
                .handle(System.out::println)
                .get();
    }

    @Bean
    public CorrelationCreator correlationCreator() {
        return new CorrelationCreator(this.futures);
    }

    public static class CorrelationCreator {

        private final BlockingQueue<CorrelationData> futures;

        public CorrelationCreator(BlockingQueue<CorrelationData> futures) {
            this.futures = futures;
        }

        public CorrelationData create(Message<String> message) {
            CorrelationData data = new CorrelationData(message.getPayload());
            this.futures.add(data);
            return data;
        }

    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null
                            && !correlationData.getId().equals("Message was returned")) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());

            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}
...