Это дополнительный вопрос к Каналу Spring Integration Executor, использующему пример кода аннотации .
Системная диаграмма прилагается .
Я пытаюсь проверить поле, выделенное красным, отправив сообщение в «Общий канал» и прочитав из сообщения REPLY_CHANNEL, установленного в сообщении.
«Общий канал» - это канал публикации подписки.REPLY_CHANNEL - это QueueChannel.
Поскольку это тест JUnit, я смоделировал jdbcTemplate, источник данных и Impl, чтобы игнорировать любые вызовы БД.
Моя проблема: КогдаЯ отправляю сообщение на «Общий канал», я не получаю сообщения на REPLY_CHANNEL.Джунит продолжает ждать ответа.
Что я должен изменить, чтобы получить ответ на REPLY_CHANNEL?
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(loader = AnnotationConfigContextLoader.class) --------- 1
@ActiveProfiles("test")
public class QueuetoQueueTest {
@Configuration
static class ContextConfiguration { ------------------------------------- 2
@Bean(name = "jdbcTemplate")
public JdbcTemplate jdbcTemplate() {
JdbcTemplate jdbcTemplateMock = Mockito.mock(JdbcTemplate.class);
return jdbcTemplateMock;
}
@Bean(name = "dataSource")
public DataSource dataSource() {
DataSource dataSourceMock = Mockito.mock(DataSource.class);
return dataSourceMock;
}
@Bean(name = "entityManager")
public EntityManager entityManager() {
EntityManager entityManagerMock = Mockito.mock(EntityManager.class);
return entityManagerMock;
}
@Bean(name = "ResponseChannel")
public QueueChannel getReplyQueueChannel() {
return new QueueChannel();
}
//This channel serves as the 'common channel' in the diagram
@Bean(name = "processRequestSubscribableChannel")
public MessageChannel getPublishSubscribeChannel() {
return new PublishSubscribeChannel();
}
}
@Mock
DBStoreDaoImpl dbStoreDaoImpl;
@Test
public void testDBConnectivity() {
Assert.assertTrue(true);
}
@InjectMocks -------------------------------------------------------------- 3
StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
@Autowired
@Qualifier("ResponseChannel")
QueueChannel ResponseChannel;
@Autowired
@Qualifier("processRequestSubscribableChannel")
MessageChannel processRequestSubscribableChannel;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void outboundtoQueueTest() {
try {
when(dbStoreDaoImpl.storeToDB(any()))
.thenReturn(1); ----------------------------------------------- 4
//create message
Message message = (Message<String>) MessageBuilder
.withPayload("Hello")
.setHeader(MessageHeaders.REPLY_CHANNEL, ResponseChannel)
.build();
//send message
processRequestSubscribableChannel.send(message);
System.out
.println("Listening on InstructionResponseHandlertoEventProcessorQueue");
//wait for response on reply channel
Message<?> response = ResponseChannel.receive(); ----------------------- 5
System.out.println("***************RECEIVED: "
+ response.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Загрузить 'ContextConfiguration'для JUnit, чтобы к БД не обращались.
Это - то, как вы загружаете пользовательскую конфигурацию в JUnit согласно https://spring.io/blog/2011/06/21/spring-3-1-m2-testing-with-configuration-classes-and-profiles
Внутри класса конфигурации,мы макетируем jdbcTemplate, dataSource, entityManager и определяем «общий канал», на котором размещен запрос, и ResponseChannel.
Внедрить jdbcTemplate, источник данных mock в StoretoDBConfig, чтобы не вызывать ошибку БД
Класс Mock DaoImpl, чтобы игнорировать вызовы БД
Тестовые блоки здесь, потому что нет ответа на REPLY_CHANNEL
ОБНОВЛЕННЫЙ КОД:
Code inside 5 (the class that reads from common channel):
@Configuration
class HandleRequestConfig {
//Common channel - refer diagram
@Autowired
PublishSubscribeChannel processRequestSubscribableChannel;
//Step 9 - This channel is used to send queue to the downstream system
@Autowired
PublishSubscribeChannel forwardToExternalSystemQueue;
public void handle() {
IntegrationFlows.from("processRequestSubscribableChannel") // Read from 'Common channel'
.wireTap(flow->flow.handle(msg -> System.out.println("Msg received on processRequestSubscribableChannel"+ msg.getPayload())))
.handle(RequestProcessor,"validateMessage") // Perform custom business logic - no logic for now, return the msg as is
.wireTap(flow->flow.handle(msg -> System.out.println("Msg received on RequestProcessor"+ msg.getPayload())))
.channel("forwardToExternalSystemQueue"); // Post to 'Channel to another system'
}
}
//Code inside step 8 - 'Custom Business Logic'
@Configuration
class RequestProcessor {
public Message<?> validateMessage(Message<?> msg) {
return msg;
}
}
ЧТО ЯПЫТАЮСЬ ДОСТИГНУТЬ: У меня есть отдельные тестовые примеры для бизнес-логики.Я пытаюсь проверить, что когда запрос публикуется в «общем канале», ответ поступает по «каналу в другую систему».
Почему я не могу использовать оригинальный ApplicationContext: Потому что он подключается к БД, и я не хочу, чтобы мой JUnit подключался к БД или использовал встроенную базу данных.Я хочу, чтобы любые вызовы в БД игнорировались.
Я установил канал ответа 'ResponseChannel', не должна ли 'Custom Business Logic' отправлять свой ответ на 'ResponseChannel'?
Если мне нужно прослушать ответ на другом канале, я готов это сделать.Все, что я хочу проверить, это то, получено ли сообщение, которое я отправляю по «общему каналу», по «каналу в другую систему».
ОБНОВЛЕНИЕ 2: Решение вопросов Артема.Спасибо Артему за ваши предложения.
Включен ли 'HandlerRequestConfig' в тестовую конфигурацию? - Мы не можем напрямую вызвать метод handle ().Вместо этого я подумал, что если я отправлю сообщение на processRequestSubscribeableChannel, метод handle () внутри HandleRequestConfig будет вызван, поскольку он прослушивает тот же канал.Это неправильно?Как мне тогда протестировать метод HandleRequestConfig.handle ()?
Я добавил прослушивание в конце каждого шага в HandleRequestConfig (код обновлен).Я считаю, что ни одно из сообщений прослушивания не распечатывается.Это означает, что сообщение, которое я отправляю, даже не достигает входного канала processRequestSubscribeableChannel.Что я делаю не так?
ПРИМЕЧАНИЕ. Я попытался удалить бин 'processRequestSubscribeableChannel' из конфигурации (чтобы использовался фактический 'processRequestSubscribeableChannel' в applicationContext).Я получаю неудовлетворенную ошибку зависимости - Ожидается, по крайней мере, 1 компонент с конфигурацией PublishSubscribeChannel.
Обновление 3: отправлены подробности, запрошен Артем.
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueuetoQueueTest {
// Step 1 - Mocking jdbcTemplate, dataSource, entityManager so that it doesn't connect to the DB
@MockBean
@Qualifier("jdbcTemplate")
JdbcTemplate jdbcTemplate;
@MockBean
@Qualifier("dataSource")
public DataSource dataSource;
@MockBean
@Qualifier("entityManager")
public EntityManager entityManager;
@Bean(name = "ResponseChannel")
public PublishSubscribeChannel getReplyQueueChannel() {
return new PublishSubscribeChannel();
}
//Mocking the DB class
@MockBean
@Qualifier("dbStoreDaoImpl")
DBStoreDaoImpl dbStoreDaoImpl ;
//Inject the mock objects created above into the flow that stores data into the DB.
@InjectMocks
StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
//Step 2 - Injecting MessageChannel used in the actual ApplicationContext
@Autowired
@Qualifier("processRequestSubscribableChannel")
MessageChannel processRequestSubscribableChannel;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void outboundtoQueueTest() {
try {
when(dbStoreDaoImpl.storeToDB(any()))
.thenReturn(1);
//create message
Message message = (Message<?>) MessageBuilder
.withPayload("Hello")
.build();
//send message - this channel is the actual channel used in ApplicationContext
processRequestSubscribableChannel.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ОШИБКА, КОТОРУЮ Я ПОЛУЧАЮ: Код пытается подключиться кБД и выдает ошибку.
ОБНОВЛЕНИЕ 1: Код внутри StoretoDBConfig
@Configuration
@EnableIntegration
public class StoretoDBConfig {
@Autowired
DataSource dataSource;
/*
* Below code is irrelevant to our current problem - Including for reference.
*
* storing into DB is delegated to a separate thread.
*
* @Bean
* public TaskExecutor taskExecutor() {
* return new SimpleAsyncTaskExecutor();
* }
*
* @Bean(name="executorChannelToDB")
* public ExecutorChannel outboundRequests() {
* return new ExecutorChannel(taskExecutor());
* }
* @Bean(name = "DBFailureChannel")
* public static MessageChannel getFailureChannel() {
* return new DirectChannel();
* }
* private static final Logger logger = Logger
* .getLogger(InstructionResponseHandlerOutboundtoDBConfig.class);
*/
@Bean
public IntegrationFlow handle() {
/*
* Read from 'common channel' - processRequestSubscribableChannel and send to separate thread that stores into DB.
*
/
return IntegrationFlows
.from("processRequestSubscribableChannel")
.channel("executorChannelToDB").get();
}
}
КОД, КОТОРЫЙ ХРАНИТ В БД НА ОТДЕЛЬНОЙ РЕЗЬБЕ:
@Repository
public class DBStoreDaoImpl implements DBStoreDao {
private JdbcTemplate jdbcTemplate;
@Autowired
public void setJdbcTemplate(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
@Transactional(rollbackFor = Exception.class)
@ServiceActivator(inputChannel = "executorChannelToDB")
public void storetoDB(Message<?> msg) throws Exception {
String insertQuery ="Insert into DBTable(MESSAGE) VALUES(?)";
jdbcTemplate.update(insertQuery, msg.toString());
}
}