Как вернуть ответ на конечную точку webflux после отправки сообщения в облачный паб Google? - PullRequest
1 голос
/ 02 мая 2020

Я создаю простое приложение для весенней загрузки с использованием интеграции с пружиной. Ниже приведены три основные конструкции этого приложения:

  1. Входящий шлюз: WebFluxInboundEndpoint, который принимает http-запрос
  2. Исходящий шлюз: PubSubMessageHandler, который отправляет сообщение в облако Google pubsub topi c
  3. Канал сообщений: FluxMessageChannel, действующий в качестве канала запроса

Облачное хранилище Google PubSubMessageHandler обеспечивает обратный вызов сбоя и успеха, из-за которого ответ об ошибке / успехе не возвращается обратно в конечную точку webflux, и запрос ожидает неопределенное время.

Спросите: как ответ об успехе / неудаче может быть возвращен после получения ответа от pubsub?

Рабочая копия приложения доступна здесь: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

Чтобы запустить приложение, введите свой облачный сервисный ключ Google в файл serviceAccountKey. json, а затем укажите переменную среды GOOGLE_APPLICATION_CREDENTIALS = / PATH_TO / serviceAccountKey. json

Пример запроса: curl -d "name = piyu sh" http://localhost: 8080 / cr eatePerson

Ниже приведен пример файла, который принимает указанный выше запрос, и после преобразования в весеннее сообщение он помещается в pubsub topi c "person"

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setPublishCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                LOGGER.info("Message was sent successfully.");
            }
        });

        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    }
}

build.gradle Зависимости:

plugins {
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "Hoxton.SR4")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-webflux'
    implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

Новый файл приложения после создания PubSubMessageHandler в качестве syn c и добавления ExpressionEvaluatingRequestHandlerAdvice, но это приводит к ошибке «beanFactory не должен быть нулевым», когда MessagingGatewaySupport создает Correlator.

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel replyChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel errorChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "expressionAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());
        endpoint.setReplyChannel(replyChannel());
        endpoint.setErrorChannel(errorChannel());

        return endpoint;
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannel(replyChannel());
        advice.setFailureChannel(errorChannel());
        return advice;
    }
}

Stacktrace: ошибка, возникающая после отправки http-запроса:

2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1]  500 Server Error for HTTP POST "/createPerson"

java.lang.IllegalArgumentException: 'beanFactory' must not be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
        at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]

Ответы [ 2 ]

0 голосов
/ 09 мая 2020

Спасибо @Артем. Я решил эту проблему, предоставив пользовательский совет обработчика запросов, который идентифицирует responseChannel из заголовка сообщения в сценарии успеха и отправляет полезную нагрузку сообщения в ответ на конечную точку weblflux.

Для сценария ошибок я полагаюсь на механизм обработки ошибок ReactiveStreamsConsumer, который внутренне использует errorChannel для отправки ошибки обратно в конечную точку weblflux.

Пожалуйста, сообщите, правильна ли эта реализация.

Ниже приведен код для PubSubRequestHandlerAdvice:

package com.example;

import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {

  private final MessagingTemplate messagingTemplate = new MessagingTemplate();

  @Override
  protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {

    Object result = callback.execute();

    Object evalResult = message.getPayload();
    MessageChannel successChannel = null;
    Object replyChannelHeader = message.getHeaders().getReplyChannel();
    if (replyChannelHeader instanceof MessageChannel) {
      successChannel = (MessageChannel) replyChannelHeader;
    }

    if (evalResult != null && successChannel != null) {
      AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
      this.messagingTemplate.send(successChannel, resultMessage);
    }
    return result;
  }
}

Конечный файл приложения, использующий PubSubRequestHandlerAdvice для PubSubMessageHandler.

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "pubSubAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    }

    @Bean
    public Advice pubSubAdvice() {
        return new PubSubRequestHandlerAdvice();
    }

}

Рабочая копия приложения доступна здесь: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

0 голосов
/ 02 мая 2020

PubSubMessageHandler не предназначен для поведения запроса / ответа. В большинстве случаев он используется как send-n-забудьте.

Поскольку вы действительно беспокоитесь об ответе об успехе / неудаче, я могу предложить только что-то вроде:

  1. PubSubMessageHandler.setSync(true):

    /**
     * Set publish method to be synchronous or asynchronous.
     *
     * <p>Publish is asynchronous be default.
     * @param sync true for synchronous, false for asynchronous
     */
    public void setSync(boolean sync) {
    

Таким образом, ваш PubSubMessageHandler будет ждать pubsubFuture.get();, а в случае неудачи MessageHandlingException будет брошен.

Чтобы справиться с успехом или неудачей для этого sync сценария, я предлагаю взглянуть на ExpressionEvaluatingRequestHandlerAdvice с successChannel и failureChannel. Где onSuccessExpression Я думаю, что #root должно указывать на requestMessage. onFailureExpression может обращаться к переменной выражения #exception SpEL, но все равно распространять requestMessage в failureChannel. Причина, по которой я говорю о requestMessage, потому что он имеет столь важный replyChannel, чтобы ответить на этот WebFluxInboundEndpoint запрос. См. Дополнительную информацию в документах: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message -handler-advice-chain

Эти подпотоки successChannel и failureChannel и отказов должны правильно отвечать некоторые возвращаются, оставляя свои outputChannel пустыми.

Но в то же время я полностью согласен с тем, что было бы намного проще сделать это PubSubMessageHandler, как AbstractReplyProducingMessageHandler, возвращая некоторые ListenableFuture чтобы мы могли обработать результат публикации.

...