Я новичок в весенней интеграции DSL.
В рамках обучения я преобразовал один XML на основе конфигурации DSL Java
Вот ссылка для конфигурации XML.
https://dzone.com/articles/spring-integration-building.
Здесьмоя конфигурация Java DSL.
package study.pattern.integration.lab9.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;
import study.pattern.integration.lab9.domain.ItemType;
import study.pattern.integration.lab9.domain.Order;
import study.pattern.integration.lab9.domain.OrderItem;
import study.pattern.integration.lab9.service.OrderDelivery;
import study.pattern.integration.lab9.service.OrderItemsProcessor;
@EnableIntegration
@Configuration
@IntegrationComponentScan
public class IntegrationConfiguration {
@Autowired
OrderItemsProcessor processor;
@Autowired
OrderDelivery orderDelivery;
@Bean
@Qualifier("orderChannel")
public DirectChannel orderChannel() {
return MessageChannels.direct().get();
}
@Bean
@Qualifier("orderItemsChannel")
public DirectChannel orderItemsChannel() {
return MessageChannels.direct().get();
}
/**
*
*
* QueueChannel: Implements PollableChannel.
* There’s one endpoint connected to the channel, no subscribers. This communication is asynchronous;
* the receiver will retrieve the message through a different thread. How it works:
The producer sends the message to the channel.
The channel queues the message.
The consumer actively retrieves the message (active receiver).
* @return
*/
@Bean
@Qualifier("musicItemsChannel")
public QueueChannel musicItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("softwareItemsChannel")
public QueueChannel softwareItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("booksItemChannel")
public QueueChannel booksItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("orderItemsProcessed")
public DirectChannel orderItemsProcessedChannel() {
return MessageChannels.direct().get();
}
@Bean
@Qualifier("orderDelivered")
public DirectChannel orderDeliveredChannel() {
return MessageChannels.direct().get();
}
@Bean
@ServiceActivator(inputChannel = "myLogChannel")
public MessageHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.INFO.name());
loggingHandler.setLoggerName("logging");
return loggingHandler;
}
@Bean
IntegrationFlow processOrderFlow() {
return IntegrationFlows
.from(orderChannel())
.split(Order.class , a -> a.getOrderItems())
.channel(orderItemsChannel())
.wireTap(f -> f.handle(logger()))
.route(OrderItem.class,
o -> o.getType().name(),
type -> type.channelMapping(ItemType.BOOK.name(),booksItemsChannel())
.channelMapping(ItemType.MUSIC_CD.name(), musicItemsChannel())
.channelMapping(ItemType.SOFTWARE.name(), softwareItemsChannel())
)
.get();
}
@Bean
IntegrationFlow processBooksItemChannel() {
return IntegrationFlows.from(booksItemsChannel())
.handle(processor,"processBooksOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
.wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow processMusicItemChannel() {
return IntegrationFlows.from(booksItemsChannel())
.handle(processor,"processMusicOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
.wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow processSoftwareItemChannel() {
return IntegrationFlows.from(booksItemsChannel())
.handle(processor, "processSoftware", spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
.wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow aggreateAllProcessedOrderItems() {
return IntegrationFlows.from(orderItemsProcessedChannel())
.aggregate(spec -> spec.processor(orderDelivery, "delivery").get())
.handle(orderDeliveredChannel())
.wireTap(f -> f.handle(logger()))
.log()
.get();
}
}
Ниже метода класса OrderDelivery.
public class OrderDelivery {
public Order delivery(GenericMessage<List<OrderItem>> orderItems) {
log.info("Delivery of the order /......");
final Order order = new Order();
orderItems.getPayload().stream().forEach(orderItem -> order.addItem(orderItem));
return order;
}
}
Когда я запускаю свой код, я получаю следующее исключение.
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'aggreateAllProcessedOrderItems' defined in class path resource [study/pattern/integration/lab9/config/IntegrationConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'aggreateAllProcessedOrderItems' threw exception; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface org.springframework.messaging.MessageHandler] for method match: [public void org.springframework.integration.channel.AbstractMessageChannel.setInterceptors(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public static java.util.UUID org.springframework.integration.context.IntegrationObjectSupport.generateId(), public void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver), public final void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression), public void org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class[]), public org.springframework.messaging.support.ChannelInterceptor org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(int), public void org.springframework.integration.channel.AbstractMessageChannel.registerMetricsCaptor(org.springframework.integration.support.management.metrics.MetricsCaptor), public boolean org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public boolean org.springframework.integration.channel.AbstractSubscribableChannel.subscribe(org.springframework.messaging.MessageHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory), public void org.springframework.integration.channel.AbstractMessageChannel.setShouldTrack(boolean), public void org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter)]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:645) ~[spring-beans-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:475) ~[spring-beans-5.2.1.RELEASE.jar:5.2.1.RELEASE]
Я попытался отладить, но я не нашел никакой подсказки, что является ошибкой кода, которая вызывает проблему. (Я попытался обернуть аргументы метода доставки интерфейсами Message и GenericMessage, но проблема все еще та же)
Может кто-нибудь помочь мне решить проблему.