Spring Integration извлекает разбитые на страницы результаты из службы REST - PullRequest
0 голосов
/ 08 февраля 2019

Я работаю над интеграцией со службой REST, идея в том, что она опрашивается исходящим шлюзом marketingCategoryOutboundGateway, реализованным HttpRequestExecutingMessageHandler.Шлюз отправляет запрос в службу REST и передает свой ответ на канал marketingCategory.Сам шлюз запускается сообщением, созданным marketingCategoryPollerMessageSource с использованием фабричного метода makeTriggeringMessage.

Проблема заключается в том, что служба возвращает постраничные результаты.Я что-то , которое прослушивало бы на канале marketingCategory, кроме уже имеющегося у меня активатора службы, проверяет ответ и отправляет новое сообщение с увеличенным номером страницы, созданным makeTriggeringMessage, на marketingCategoryPoller channel, чтобы код вращался в цикле, пока не получит все страницы из службы REST.

Позволяет ли Spring Integration создавать такие фильтры, которые получают одно сообщение на входном канале, проверять его на соответствиеи выдать новое сообщение на выходной канал, если условие истинно?

Код:

//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}

//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}

//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}

//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
    //make a message for triggering marketingCategoryOutboundGateway
    return MessageBuilder.withPayload("")
            .setHeader("Host", "eclinic")
            .setHeader("page", page)
            .build();
}

//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
    //make a request to the REST service and push the response to the marketingCategory channel
}

//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
    return (msg) -> {
        //process the categories returned by marketingCategoryOutboundGateway
    };
}

1 Ответ

0 голосов
/ 08 февраля 2019

Я нашел решение на основе этой публикации Чтение и загрузка из разбитых на страницы REST-Services с пружинной интеграцией :

  1. Запуск исходящего шлюза, который говоритв службу REST и отправляет ответ на канал с помощью адаптера входящего канала с помощью программы опроса.Адаптер входящего канала - это источник сообщений, который изначально генерирует сообщение с заголовком, указывающим номер страницы, который нужно извлечь из REST API.Заголовок сообщения страницы используется исходящим шлюзом для генерации URL-адреса, задающего нужную страницу

  2. Канал, на который исходящий шлюз отправляет ответы службы REST, имеет 2 подписчика:

    2,1.сервисный активатор, который что-то делает с извлеченными данными

    2.2.фильтр, который проверяет, является ли это последней страницей, а если нет, то отправляет сообщение далее в другой канал, используемый обогащателем заголовка

  3. Получив сообщение, обогащение заголовка увеличивает егозаголовок страницы и отправляет сообщение дальше к каналу, который запускает исходящий шлюз, шлюз считывает увеличенный заголовок страницы и извлекает следующую страницу из службы REST

  4. Цикл продолжает вращаться до тех пор, покаСлужба REST возвращает последнюю страницу.Фильтр не пропускает это сообщение в обогащающий заголовок, прерывая цикл.

Полный код:

@Configuration
public class IntegrationConfiguration {

    private final ApiGateConfig apiGateConfig;

    IntegrationConfiguration(ApiGateConfig apiGateConfig) {
        this.apiGateConfig = apiGateConfig;
    }

    @Bean("marketingCategory")
    MessageChannel marketingCategory() {
        return new PublishSubscribeChannel();
    }

    @Bean
    MessageChannel marketingCategoryPoller() {
        return new DirectChannel();
    }

    @Bean
    MessageChannel marketingCategoryPollerNextPage() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
    public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
        return () -> makeTriggeringMessage(0);
    }

    /**
     * Build a gateway triggering message
     */
    private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
        return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
                .setHeader("Host", "eclinic")
                .setHeader("page", page)
                .build();
    }

    @Bean
    @ServiceActivator(inputChannel = "marketingCategoryPoller")
    public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {

        String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";

        //the type of the payload
        ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
        };

        //page number comes from the message
        SpelExpressionParser expressionParser = new SpelExpressionParser();
        var uriVariables = new HashMap<String, Expression>();
        uriVariables.put("page", expressionParser.parseExpression("headers.page"));

        HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
        handler.setHttpMethod(HttpMethod.GET);
        handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
        handler.setOutputChannel(channel);
        handler.setUriVariableExpressions(uriVariables);

        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "marketingCategory")
    public MessageHandler marketingCategoryHandler() {
        return (msg) -> {
            var page = (RestPageImpl<MarketingCategory>) msg.getPayload();

            System.out.println("Page #" + page.getNumber());

            page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));

        };
    }

    @Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
    public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
        return !page.isLast();
    }

    @Bean
    @Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
    HeaderEnricher incrementPage() {
        Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
        Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");

        var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
        valueProcessor.setOverwrite(true);

        headersToAdd.put("page", valueProcessor);
        return new HeaderEnricher(headersToAdd);
    }
}
...