Я создаю простое приложение для весенней загрузки с использованием интеграции с пружиной. Ниже приведены три основные конструкции этого приложения:
- Входящий шлюз: WebFluxInboundEndpoint, который принимает http-запрос
- Исходящий шлюз: PubSubMessageHandler, который отправляет сообщение в облако Google pubsub topi c
- Канал сообщений: FluxMessageChannel, действующий в качестве канала запроса
Облачное хранилище Google PubSubMessageHandler обеспечивает обратный вызов сбоя и успеха, из-за которого ответ об ошибке / успехе не возвращается обратно в конечную точку webflux, и запрос ожидает неопределенное время.
Спросите: как ответ об успехе / неудаче может быть возвращен после получения ответа от pubsub?
Рабочая копия приложения доступна здесь: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
Чтобы запустить приложение, введите свой облачный сервисный ключ Google в файл serviceAccountKey. json, а затем укажите переменную среды GOOGLE_APPLICATION_CREDENTIALS = / PATH_TO / serviceAccountKey. json
Пример запроса: curl -d "name = piyu sh" http://localhost: 8080 / cr eatePerson
Ниже приведен пример файла, который принимает указанный выше запрос, и после преобразования в весеннее сообщение он помещается в pubsub topi c "person"
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setPublishCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
LOGGER.info("Message was sent successfully.");
}
});
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
}
}
build.gradle Зависимости:
plugins {
id 'org.springframework.boot' version '2.2.6.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-webflux'
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
Новый файл приложения после создания PubSubMessageHandler в качестве syn c и добавления ExpressionEvaluatingRequestHandlerAdvice, но это приводит к ошибке «beanFactory не должен быть нулевым», когда MessagingGatewaySupport создает Correlator.
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel errorChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "expressionAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
endpoint.setReplyChannel(replyChannel());
endpoint.setErrorChannel(errorChannel());
return endpoint;
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannel(replyChannel());
advice.setFailureChannel(errorChannel());
return advice;
}
}
Stacktrace: ошибка, возникающая после отправки http-запроса:
2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1] 500 Server Error for HTTP POST "/createPerson"
java.lang.IllegalArgumentException: 'beanFactory' must not be null
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]