Spring IntegrationFlow с RestController - PullRequest
0 голосов
/ 27 мая 2020

Я хотел бы создать приложение, которое выполняло бы следующие шаги:

  1. Получение запроса через RestController
  2. Отправка полученного сообщения в очередь (AMQP - MessageChannel) (correlationId ?)
  3. Дождаться ответа в другой очереди (AMQP - MessageChannel) (correlationId?)
  4. Вернуть ответ в том же потоке, что и запрос на шаге 1.

Я думал об использовании для этого IntegrationFlow, но не могу адаптировать шаги.

Кроме того, знаете ли вы, как лучше всего реализовать этот поток?

I пытался реализовать с помощью следующего кода:

package poc.integration.http;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.http.inbound.HttpRequestHandlingMessagingGateway;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
@EnableIntegration
public class IntegrationFlowConfig {

    final Logger logger = LoggerFactory.getLogger(IntegrationFlowConfig.class);

    @Bean
    public HttpRequestHandlingMessagingGateway inbound() {
        HttpRequestHandlingMessagingGateway gateway = new HttpRequestHandlingMessagingGateway(true);
        gateway.setRequestMapping(mapping());
        gateway.setRequestPayloadTypeClass(String.class);
        gateway.setRequestChannelName("httpRequest");
        return gateway;
    }

    @Bean
    public RequestMapping mapping() {
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/foo");
        requestMapping.setMethods(HttpMethod.GET);
        return requestMapping;
    }

    @ServiceActivator(inputChannel = "httpResponse")
    @Bean
    public HttpRequestExecutingMessageHandler outbound() {
        HttpRequestExecutingMessageHandler handler =
            new HttpRequestExecutingMessageHandler("http://10.141.201.206:80/foo");
        handler.setHttpMethod(HttpMethod.GET);
        handler.setExpectedResponseType(String.class);
        return handler;
    }

    @ServiceActivator(inputChannel="httpRequest", outputChannel="httpResponse")
    public Object processarMensagem(Object mensagem) {
        return mensagem + " - done";
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata pollerAmqp(ThreadPoolTaskScheduler taskScheduler) {
        final PollerMetadata poller = new PollerMetadata();
        poller.setTaskExecutor(taskScheduler);
        poller.setReceiveTimeout(-1);
        return poller;
    }

    @Bean
    public MessageChannel httpRequest(AmqpTemplate amqpTemplate) {
        PollableAmqpChannel channel = new PollableAmqpChannel("httpRequest", amqpTemplate,
                DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
        channel.setExtractPayload(true);
        return channel;
    }

    @Bean
    public MessageChannel httpResponse(AmqpTemplate amqpTemplate) {
        PollableAmqpChannel channel = new PollableAmqpChannel("httpResponse", amqpTemplate,
                DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
        channel.setExtractPayload(true);
        return channel;
    }

}

, но я получаю сообщение: Ответ не получен в течение таймаута

1 Ответ

1 голос
/ 27 мая 2020

Вам просто нужен IntegrationFlow с Http.inboundControllerAdapter() в качестве отправной точки. Он полностью заменяет упомянутый RestController, но позволяет избежать дополнительной работы, связанной с мостом от @RestController к IntegrationFlow и обратно.

Следующим шагом в потоке должен быть Amqp.outboundGateway() для отправки и получения через AMQP. Этот позаботится о корреляции за вас.

Подробнее см. В документации:

https://docs.spring.io/spring-integration/docs/5.3.0.RELEASE/reference/html/http.html#http - java -config

https://docs.spring.io/spring-integration/docs/5.3.0.RELEASE/reference/html/amqp.html#configuring -с-java -dsl-4

...