модульный тест publishSubscribeChannel не может работать хорошо - PullRequest
0 голосов
/ 28 декабря 2018

мой класс конфигурации интеграции находится ниже ,, когда я выполняю какой-то модульный тест на них, обнаружил, что: когда я отправляю сообщение на UserRecipientSubscribeCacheChannel, он работает хорошо;Когда я отправляю сообщение на верхний уровень канала userReportWriteCompletedRouteChannel, он не работает, и никаких исключений пока не выдается.я не могу понять этоконечно, сообщения, которые я отправлял, одинаковы.

из-за раздела сбоя следующий обработчик не может работать нормально.ты !!

ниже работает нормально, печатается ===>ip location channel message:GenericMessage [payload=[MailRecipientActionDocumen... и ===>user recipient channel message:GenericMessage [payload=[UserRecipientSubscribeDataRedisStructure...

 @Test
public void test_sendMessageUserRecipientSubscribeCacheChannel(){
    UserRecipientSubscribeCacheChannel.send(createMessageWithIp());
}  

ниже работает, печатается ===>ip location channel message:GenericMessage [payload=[MailRecipientActionDocumen... только

уведомлениечто: секция сбоя, перед обработчиком есть трансформатор.

@Test
public void test_sendMessageToRouteChannel() {
    userReportWriteCompletedRouteChannel.send(createMessageWithIp());
}

мой код конфигурации ниже:

@Bean  
public SubscribableChannel userReportWriteCompletedSubscribeChannel() {  
    return new DirectChannel();  
} 

@Bean
public QueueChannel userReportWriteCompletedRouteChannel() {
    return new QueueChannel();
}

@Bean
public MessageChannel ipLocationResolveCacheChannel() {
    return new DirectChannel();
}

@Bean
public MessageChannel userRecipientSubscribeCacheChannel() {
    return new DirectChannel();
}

@MessagingGateway(name = "userReportWriteCompletedListener",
        defaultRequestChannel = "userReportWriteCompletedRouteChannel")
public interface UserReportWriteCompletedListener {
    @Gateway
    void receive(List<UserMailRecipientActionDocument> docs);
}

@Bean
public IntegrationFlow bridgeFlow() {
    return flow -> flow.channel("userReportWriteCompletedRouteChannel")
            .bridge(bridgeSpe -> bridgeSpe
                    .poller(pollerFactory -> pollerFactory.fixedRate(500).maxMessagesPerPoll(1)))

            .channel("userReportWriteCompletedSubscribeChannel")
            ;
}

@Bean
public IntegrationFlow subscribeFlow() {
    return IntegrationFlows.from("userReportWriteCompletedSubscribeChannel")
            .publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
                    .subscribe(flow -> flow
                            .channel(IP_LOCATION_RESOLVE_CACHE_CHANNEL)
                    )
                    .subscribe(flow -> flow
                            .channel(USER_RECIPIENT_SUBSCRIBE_CACHE_CHANNEL)
                    ))
            .get();
}

@Bean
public RedisStoreWritingMessageHandler ipLocationResolveCacheHandler(RedisTemplate<String, ?> redisTemplate) {
    final RedisStoreWritingMessageHandler ipLocationResolveCacheHandler =
            new RedisStoreWritingMessageHandler(redisTemplate);
    ipLocationResolveCacheHandler.setKey("IP_LOCATION_RESOLVE_CACHE");
    return ipLocationResolveCacheHandler;
}

@Bean
public RedisStoreWritingMessageHandler userRecipientSubscribeCacheHandler(RedisTemplate<String, ?> redisTemplate) {
    final RedisStoreWritingMessageHandler userRecipientSubscribeCacheHandler =
            new RedisStoreWritingMessageHandler(redisTemplate);
    userRecipientSubscribeCacheHandler.setKey("USER_RECIPIENT_SUBSCRIBE_CACHE");
    return userRecipientSubscribeCacheHandler;
}

@Bean
public IpLocationResolveRedisStructureFilterAndTransformer recipientActionHasIpFilterAndTransformer() {
    return new IpLocationResolveRedisStructureFilterAndTransformer();
}

@Bean
public UserRecipientSubscribeDataRedisStructureTransformer subscribeDataRedisStructureTransformer(
        IpLocationClient ipLocationClient) {
    return new UserRecipientSubscribeDataRedisStructureTransformer(ipLocationClient);
}

@Bean
public IntegrationFlow ipLocationResolveCacheFlow(
        @Qualifier("ipLocationResolveCacheHandler") RedisStoreWritingMessageHandler writingMessageHandler) {
    return flow -> flow.channel(IP_LOCATION_RESOLVE_CACHE_CHANNEL)
            .handle(message -> {
                System.out.println("===>ip location  channel message:" + message);
            })

            ;
}

@Bean
public IntegrationFlow userRecipientActionDataCacheFlow(
        @Qualifier("userRecipientSubscribeCacheHandler") RedisStoreWritingMessageHandler messageHandler,
        UserRecipientSubscribeDataRedisStructureTransformer transformer) {
    return flow -> flow.channel(USER_RECIPIENT_SUBSCRIBE_CACHE_CHANNEL)
            .transform(transformer)
                   .handle(message -> {
                System.out.println("===>user recipient  channel message:" + message);
            })      
}  

Я ожидаю 2 сообщения о печати, но только 1.

1 Ответ

0 голосов
/ 29 декабря 2018

Сегодня я обнаружил, что в мостовом потоке могут быть некоторые проблемы. Когда я перемещаю обработчик за каналом userReportWriteCompletedSubscribeChannel, он не может распечатать ни одно сообщение;когда я удалю канал и добавлю обработчик напрямую, он напечатает сообщение.я неправильно использую мост?

   @Bean
public IntegrationFlow bridgeFlow() {
    return flow -> flow.channel("userReportWriteCompletedRouteChannel")
            .bridge(bridgeSpe -> bridgeSpe
                    .poller(pollerFactory -> pollerFactory.fixedRate(100).maxMessagesPerPoll(1)))
            .handle(message -> {
                System.out.println("===>route  channel message:" + message);
            })   // handle ok ,  will print message
            .channel("userReportWriteCompletedSubscribeChannel")
            // .handle(message -> {
            //    System.out.println("===>route  channel message:" + message);
            // })       // handle fail ,  will not printing message
            ;
}

тест:

    @Test
//invalid
public void test_sendMessageToRouteChannel() {
    userReportWriteCompletedRouteChannel.send(createMessageWithIp());
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...