Как передать заголовок с помощью WireTap? - PullRequest
0 голосов
/ 30 октября 2019

Теперь у меня есть следующий поток:

flow -> flow.channel(some_channel())
                .....
                .gateway(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));

Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}

default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
    MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
    var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
    interceptor.setDiscardChannel(idempotentDiscardChannel());
    return interceptor;
}

Проблема здесь в том, что:

anotherFlow заканчивается MessageHandler, что составляет void, поэтомуanotherFlow ничего не возвращает.

Я пытался использовать следующий подход:

 flow -> flow.channel(some_channel())
                    .....
                    .wireTap(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));

, но компилятор жалуется на idempotentByHeader тип возвращаемого значения, поэтому я попытался сделать следующее:

default Consumer<WireTapSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}

но у WireTapSpec нет метода совета.

как это исправить?

PS

Мне удалось написать с изменением типа возвращаемого значения idempotentByHeader

            .wireTap(anotherFlow)
            .enrich(idempotentByHeader(OBJECT_ID_HEADER));

Но теперь приложение не может запуститься из-за:

Caused by: java.lang.IllegalStateException: If the errorChannel is set, then the requestChannel must not be null
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.integration.transformer.ContentEnricher.doInit(ContentEnricher.java:277)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.onInit(AbstractReplyProducingMessageHandler.java:98)
    at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1862)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1799)
    ... 42 common frames omitted

1 Ответ

1 голос
/ 30 октября 2019

OK. Вам не хватает того факта, что WireTap является канальным перехватчиком . Это не конечная точка подобный шлюз, который может принимать идемпотентный приемник-перехватчик.

Я не уверен, какова ваша цель с этим idempotentByHeaderInterceptor, но заголовки действительно передаются всообщение, которое будет отправлено в эту WireTap. Поэтому вы получаете доступ к заголовкам в подпотоке, подписанном на эту WireTap.

Также ваш последний пример с enrich() немного смущает меня. Раньше со шлюзом вы пытались избежать отправки того же сообщения в этот подпоток через idempotentByHeaderInterceptor, но теперь вы отправляете на wireTap безоговорочно и только после этого применяете так, чтобы idempotentByHeaderInterceptor.

Итак, какова цель вашего idempotentByHeaderInterceptor и где вы хотели бы его применить?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...