мой класс конфигурации интеграции находится ниже ,, когда я выполняю какой-то модульный тест на них, обнаружил, что: когда я отправляю сообщение на UserRecipientSubscribeCacheChannel
, он работает хорошо;Когда я отправляю сообщение на верхний уровень канала userReportWriteCompletedRouteChannel
, он не работает, и никаких исключений пока не выдается.я не могу понять этоконечно, сообщения, которые я отправлял, одинаковы.
из-за раздела сбоя следующий обработчик не может работать нормально.ты !!
ниже работает нормально, печатается ===>ip location channel message:GenericMessage [payload=[MailRecipientActionDocumen...
и ===>user recipient channel message:GenericMessage [payload=[UserRecipientSubscribeDataRedisStructure...
@Test
public void test_sendMessageUserRecipientSubscribeCacheChannel(){
UserRecipientSubscribeCacheChannel.send(createMessageWithIp());
}
ниже работает, печатается ===>ip location channel message:GenericMessage [payload=[MailRecipientActionDocumen...
только
уведомлениечто: секция сбоя, перед обработчиком есть трансформатор.
@Test
public void test_sendMessageToRouteChannel() {
userReportWriteCompletedRouteChannel.send(createMessageWithIp());
}
мой код конфигурации ниже:
@Bean
public SubscribableChannel userReportWriteCompletedSubscribeChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel userReportWriteCompletedRouteChannel() {
return new QueueChannel();
}
@Bean
public MessageChannel ipLocationResolveCacheChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel userRecipientSubscribeCacheChannel() {
return new DirectChannel();
}
@MessagingGateway(name = "userReportWriteCompletedListener",
defaultRequestChannel = "userReportWriteCompletedRouteChannel")
public interface UserReportWriteCompletedListener {
@Gateway
void receive(List<UserMailRecipientActionDocument> docs);
}
@Bean
public IntegrationFlow bridgeFlow() {
return flow -> flow.channel("userReportWriteCompletedRouteChannel")
.bridge(bridgeSpe -> bridgeSpe
.poller(pollerFactory -> pollerFactory.fixedRate(500).maxMessagesPerPoll(1)))
.channel("userReportWriteCompletedSubscribeChannel")
;
}
@Bean
public IntegrationFlow subscribeFlow() {
return IntegrationFlows.from("userReportWriteCompletedSubscribeChannel")
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.channel(IP_LOCATION_RESOLVE_CACHE_CHANNEL)
)
.subscribe(flow -> flow
.channel(USER_RECIPIENT_SUBSCRIBE_CACHE_CHANNEL)
))
.get();
}
@Bean
public RedisStoreWritingMessageHandler ipLocationResolveCacheHandler(RedisTemplate<String, ?> redisTemplate) {
final RedisStoreWritingMessageHandler ipLocationResolveCacheHandler =
new RedisStoreWritingMessageHandler(redisTemplate);
ipLocationResolveCacheHandler.setKey("IP_LOCATION_RESOLVE_CACHE");
return ipLocationResolveCacheHandler;
}
@Bean
public RedisStoreWritingMessageHandler userRecipientSubscribeCacheHandler(RedisTemplate<String, ?> redisTemplate) {
final RedisStoreWritingMessageHandler userRecipientSubscribeCacheHandler =
new RedisStoreWritingMessageHandler(redisTemplate);
userRecipientSubscribeCacheHandler.setKey("USER_RECIPIENT_SUBSCRIBE_CACHE");
return userRecipientSubscribeCacheHandler;
}
@Bean
public IpLocationResolveRedisStructureFilterAndTransformer recipientActionHasIpFilterAndTransformer() {
return new IpLocationResolveRedisStructureFilterAndTransformer();
}
@Bean
public UserRecipientSubscribeDataRedisStructureTransformer subscribeDataRedisStructureTransformer(
IpLocationClient ipLocationClient) {
return new UserRecipientSubscribeDataRedisStructureTransformer(ipLocationClient);
}
@Bean
public IntegrationFlow ipLocationResolveCacheFlow(
@Qualifier("ipLocationResolveCacheHandler") RedisStoreWritingMessageHandler writingMessageHandler) {
return flow -> flow.channel(IP_LOCATION_RESOLVE_CACHE_CHANNEL)
.handle(message -> {
System.out.println("===>ip location channel message:" + message);
})
;
}
@Bean
public IntegrationFlow userRecipientActionDataCacheFlow(
@Qualifier("userRecipientSubscribeCacheHandler") RedisStoreWritingMessageHandler messageHandler,
UserRecipientSubscribeDataRedisStructureTransformer transformer) {
return flow -> flow.channel(USER_RECIPIENT_SUBSCRIBE_CACHE_CHANNEL)
.transform(transformer)
.handle(message -> {
System.out.println("===>user recipient channel message:" + message);
})
}
Я ожидаю 2 сообщения о печати, но только 1.