Spring Integration - нет ответа на ответном канале - PullRequest
0 голосов
/ 13 июня 2018

Это дополнительный вопрос к Каналу Spring Integration Executor, использующему пример кода аннотации .

Системная диаграмма прилагается enter image description here.

Я пытаюсь проверить поле, выделенное красным, отправив сообщение в «Общий канал» и прочитав из сообщения REPLY_CHANNEL, установленного в сообщении.

«Общий канал» - это канал публикации подписки.REPLY_CHANNEL - это QueueChannel.

Поскольку это тест JUnit, я смоделировал jdbcTemplate, источник данных и Impl, чтобы игнорировать любые вызовы БД.

Моя проблема: КогдаЯ отправляю сообщение на «Общий канал», я не получаю сообщения на REPLY_CHANNEL.Джунит продолжает ждать ответа.

Что я должен изменить, чтобы получить ответ на REPLY_CHANNEL?

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(loader = AnnotationConfigContextLoader.class) --------- 1 
@ActiveProfiles("test")
public class QueuetoQueueTest {
    @Configuration
    static class ContextConfiguration { ------------------------------------- 2
        @Bean(name = "jdbcTemplate")
        public JdbcTemplate jdbcTemplate() {
            JdbcTemplate jdbcTemplateMock = Mockito.mock(JdbcTemplate.class);
            return jdbcTemplateMock;
        }

        @Bean(name = "dataSource")
        public DataSource dataSource() {
            DataSource dataSourceMock = Mockito.mock(DataSource.class);
            return dataSourceMock;
        }

        @Bean(name = "entityManager")
        public EntityManager entityManager() {
            EntityManager entityManagerMock = Mockito.mock(EntityManager.class);
            return entityManagerMock;
        }

        @Bean(name = "ResponseChannel")
        public QueueChannel getReplyQueueChannel() {
            return new QueueChannel();
        }

//This channel serves as the 'common channel' in the diagram
        @Bean(name = "processRequestSubscribableChannel")
        public MessageChannel getPublishSubscribeChannel() {
            return new PublishSubscribeChannel();
        }
    }

    @Mock
    DBStoreDaoImpl dbStoreDaoImpl;

    @Test
    public void testDBConnectivity() {
        Assert.assertTrue(true);
    }

    @InjectMocks -------------------------------------------------------------- 3
    StoretoDBConfig storetoDBConfig = new StoretoDBConfig();

    @Autowired
    @Qualifier("ResponseChannel")
    QueueChannel ResponseChannel;

    @Autowired
    @Qualifier("processRequestSubscribableChannel")
    MessageChannel processRequestSubscribableChannel;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void outboundtoQueueTest() {
        try {
            when(dbStoreDaoImpl.storeToDB(any()))
                    .thenReturn(1); ----------------------------------------------- 4

            //create message
            Message message = (Message<String>) MessageBuilder
                    .withPayload("Hello")
                    .setHeader(MessageHeaders.REPLY_CHANNEL, ResponseChannel)
                    .build();

            //send message
            processRequestSubscribableChannel.send(message);
            System.out
                    .println("Listening on InstructionResponseHandlertoEventProcessorQueue");

            //wait for response on reply channel
            Message<?> response = ResponseChannel.receive(); ----------------------- 5
            System.out.println("***************RECEIVED: "
                    + response.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. Загрузить 'ContextConfiguration'для JUnit, чтобы к БД не обращались.

  2. Это - то, как вы загружаете пользовательскую конфигурацию в JUnit согласно https://spring.io/blog/2011/06/21/spring-3-1-m2-testing-with-configuration-classes-and-profiles

Внутри класса конфигурации,мы макетируем jdbcTemplate, dataSource, entityManager и определяем «общий канал», на котором размещен запрос, и ResponseChannel.

Внедрить jdbcTemplate, источник данных mock в StoretoDBConfig, чтобы не вызывать ошибку БД

Класс Mock DaoImpl, чтобы игнорировать вызовы БД

Тестовые блоки здесь, потому что нет ответа на REPLY_CHANNEL

ОБНОВЛЕННЫЙ КОД:

Code inside 5 (the class that reads from common channel):

    @Configuration
    class HandleRequestConfig {

        //Common channel - refer diagram
        @Autowired
        PublishSubscribeChannel processRequestSubscribableChannel;

        //Step 9 - This channel is used to send queue to the downstream system
        @Autowired
        PublishSubscribeChannel forwardToExternalSystemQueue;

        public void handle() {
            IntegrationFlows.from("processRequestSubscribableChannel")          // Read from 'Common channel'
            .wireTap(flow->flow.handle(msg -> System.out.println("Msg received on processRequestSubscribableChannel"+ msg.getPayload())))
            .handle(RequestProcessor,"validateMessage")                         // Perform custom business logic - no logic for now, return the msg as is   
            .wireTap(flow->flow.handle(msg -> System.out.println("Msg received on RequestProcessor"+ msg.getPayload())))
            .channel("forwardToExternalSystemQueue");                           // Post to 'Channel to another system' 
            }

    }

    //Code inside step 8 - 'Custom Business Logic' 
    @Configuration
    class RequestProcessor {
        public Message<?> validateMessage(Message<?> msg) {
            return msg;
        }
    }

ЧТО ЯПЫТАЮСЬ ДОСТИГНУТЬ: У меня есть отдельные тестовые примеры для бизнес-логики.Я пытаюсь проверить, что когда запрос публикуется в «общем канале», ответ поступает по «каналу в другую систему».

Почему я не могу использовать оригинальный ApplicationContext: Потому что он подключается к БД, и я не хочу, чтобы мой JUnit подключался к БД или использовал встроенную базу данных.Я хочу, чтобы любые вызовы в БД игнорировались.

Я установил канал ответа 'ResponseChannel', не должна ли 'Custom Business Logic' отправлять свой ответ на 'ResponseChannel'?

Если мне нужно прослушать ответ на другом канале, я готов это сделать.Все, что я хочу проверить, это то, получено ли сообщение, которое я отправляю по «общему каналу», по «каналу в другую систему».

ОБНОВЛЕНИЕ 2: Решение вопросов Артема.Спасибо Артему за ваши предложения.

Включен ли 'HandlerRequestConfig' в тестовую конфигурацию? - Мы не можем напрямую вызвать метод handle ().Вместо этого я подумал, что если я отправлю сообщение на processRequestSubscribeableChannel, метод handle () внутри HandleRequestConfig будет вызван, поскольку он прослушивает тот же канал.Это неправильно?Как мне тогда протестировать метод HandleRequestConfig.handle ()?

Я добавил прослушивание в конце каждого шага в HandleRequestConfig (код обновлен).Я считаю, что ни одно из сообщений прослушивания не распечатывается.Это означает, что сообщение, которое я отправляю, даже не достигает входного канала processRequestSubscribeableChannel.Что я делаю не так?

ПРИМЕЧАНИЕ. Я попытался удалить бин 'processRequestSubscribeableChannel' из конфигурации (чтобы использовался фактический 'processRequestSubscribeableChannel' в applicationContext).Я получаю неудовлетворенную ошибку зависимости - Ожидается, по крайней мере, 1 компонент с конфигурацией PublishSubscribeChannel.

Обновление 3: отправлены подробности, запрошен Артем.

@RunWith(SpringRunner.class)
@SpringBootTest
public class QueuetoQueueTest  {

//  Step 1 - Mocking jdbcTemplate, dataSource, entityManager so that it doesn't connect to the DB
        @MockBean
        @Qualifier("jdbcTemplate")
        JdbcTemplate jdbcTemplate;

        @MockBean
        @Qualifier("dataSource")
        public DataSource dataSource;

        @MockBean
        @Qualifier("entityManager")
        public EntityManager entityManager;

        @Bean(name = "ResponseChannel")
        public PublishSubscribeChannel getReplyQueueChannel() {
            return new PublishSubscribeChannel();
        }

        //Mocking the DB class
        @MockBean
        @Qualifier("dbStoreDaoImpl")
        DBStoreDaoImpl  dbStoreDaoImpl ;


        //Inject the mock objects created above into the flow that stores data into the DB.
        @InjectMocks
        StoretoDBConfig storetoDBConfig = new StoretoDBConfig();

//Step 2 - Injecting MessageChannel used in the actual ApplicationContext
        @Autowired
        @Qualifier("processRequestSubscribableChannel")
        MessageChannel processRequestSubscribableChannel;

        @Before
        public void setUp() throws Exception {
            MockitoAnnotations.initMocks(this);
        }

        @Test
        public void outboundtoQueueTest() {
        try {
            when(dbStoreDaoImpl.storeToDB(any()))
                    .thenReturn(1);

            //create message
            Message message = (Message<?>) MessageBuilder
                    .withPayload("Hello")
                    .build();
            //send message - this channel is the actual channel used in ApplicationContext
            processRequestSubscribableChannel.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ОШИБКА, КОТОРУЮ Я ПОЛУЧАЮ: Код пытается подключиться кБД и выдает ошибку.

ОБНОВЛЕНИЕ 1: Код внутри StoretoDBConfig

@Configuration
@EnableIntegration
public class StoretoDBConfig {
    @Autowired
    DataSource dataSource;

/*
 * Below code is irrelevant to our current problem - Including for reference.
 * 
 * storing into DB is delegated to a separate thread.
 *
 *  @Bean
 *  public TaskExecutor taskExecutor() {
 *      return new SimpleAsyncTaskExecutor();
 *  }
 *  
 *  @Bean(name="executorChannelToDB")
 *  public ExecutorChannel outboundRequests() {
 *      return new ExecutorChannel(taskExecutor());
 *  }
 *  @Bean(name = "DBFailureChannel")
 *  public static MessageChannel getFailureChannel() {
 *      return new DirectChannel();
 *  }
 *  private static final Logger logger = Logger
 *              .getLogger(InstructionResponseHandlerOutboundtoDBConfig.class);
*/
    @Bean
    public IntegrationFlow handle() {
    /*
     * Read from 'common channel' - processRequestSubscribableChannel and send to separate thread that stores into DB.
     *
     /
        return IntegrationFlows
                .from("processRequestSubscribableChannel")
                .channel("executorChannelToDB").get();
    }
}

КОД, КОТОРЫЙ ХРАНИТ В БД НА ОТДЕЛЬНОЙ РЕЗЬБЕ:

@Repository
public class DBStoreDaoImpl implements DBStoreDao {
    private JdbcTemplate jdbcTemplate;

    @Autowired
    public void setJdbcTemplate(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    @ServiceActivator(inputChannel = "executorChannelToDB")
    public void storetoDB(Message<?> msg) throws Exception {
            String insertQuery ="Insert into DBTable(MESSAGE) VALUES(?)";
            jdbcTemplate.update(insertQuery, msg.toString());
    }
}

1 Ответ

0 голосов
/ 13 июня 2018

Пожалуйста, покажите нам, что подписано на это Common channel.Ваша диаграмма как-то не связана с тем, что вы нам показываете.Код, который вы демонстрируете, не полный.

Реальная проблема с replyChannel в том, что что-то действительно должно отправить ему сообщение.Если ваш поток просто односторонний - отправить, сохранить и ничего не вернуть, - тогда вы действительно ничего не получите за этот.Вот почему бы показывать эти адаптеры каналов.

Лучший способ наблюдать за отправкой сообщений - это включить ведение журнала отладки для категории org.springframework.integration.

Хотя я вижу, что вы объявляете эти каналыкак в ContextConfiguration и подписчиков на getRequestChannel действительно нет.Поэтому никто не собирается использовать ваше сообщение и, конечно, никто не собирается отправлять вам ответ.

Пожалуйста, пересмотрите свой тестовый класс, чтобы использовать реальный контекст приложения.Иначе совершенно неясно, чего бы вы хотели достичь, если бы вы действительно не проверяли свой поток ...

...