Я использую Spring Integration для создания следующего потока: Входной канал -> Разветвитель -> Трансформатор -> Активатор службы -> Агрегатор
Трансформатор и Активатор службы связаны и выполняются с помощью задачи-исполнителя. Во время исполнения приложения проблем нет. Но когда я пытаюсь запустить модульные тесты, поток-исполнитель, выполняющий Service Activator, таинственным образом завершает свою работу, если есть долго выполняемая задача. Чтобы продемонстрировать это, я создал пример проекта со следующей конфигурацией:
<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
channel="helloWorldChannel"/>
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
<bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
<int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>
<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
<int:transformer>
<bean id="stringTransformer" class="hello.Transformer"></bean>
</int:transformer>
<int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
<bean class="hello.ResponseAggregator"/>
</int:aggregator>
Это класс Splitter:
public class Splitter {
public List<String> splitMessage(Message message) {
String msg = message.getPayload().toString();
return Arrays.asList(msg.split(","));
}
}
Это класс Transformer:
public class Transformer {
public String transform(Message message) {
String msg = message.getPayload().toString();
return msg+"t";
}
}
Это класс Service Activator:
@Service
public class HelloWorldAmqService {
public Message processMsg(String msg) throws InterruptedException {
DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date date = new Date();
//Simulate long running process
if(msg.equalsIgnoreCase("1t")){
Thread.sleep(500);
System.out.println("After first sleep");
Thread.sleep(800);
}
System.out.println("*************"+ msg + " as of "+sdf.format(date)+" *********** " );
return MessageBuilder.withPayload(msg).build();
}
}
Чтобы смоделировать долгосрочное задание, я добавил Thread.sleep()
в методе processMsg.
Это класс ResponseAggregator:
@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
StringBuilder builder = new StringBuilder();
for (Message message : group.getMessages()) {
builder.append(message.getPayload());
}
System.out.println(builder.toString());
return MessageBuilder.withPayload(builder.toString()).build();
}
}
Я написал модульный тест, чтобы отправить пример сообщения на канал и проверить поведение. Но всякий раз, когда поток Service Activator обрабатывает более 1000 мс, он завершает работу без предупреждения. Вот модульный тест
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {
@Autowired
private MessageChannel helloWorldChannel;
@Autowired
private HelloWorldAmqService hello;
@Autowired
@Qualifier("jmsConnectionFactory")
ConnectionFactory jmsConnectionFactory;
@Test
public void test() {
helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
assertThat(true);
}
}
Обычно можно ожидать, что выход будет иметь агрегированные результаты, то есть 1t2t3t4t6t (порядок может быть другим). Но приложение не достигает агрегатора вообще. Поток, отвечающий за обработку «1t», завершает работу, и агрегатор вообще не запускается, поскольку поведение по умолчанию заключается в ожидании получения всех сообщений.
Это ответ, если я оставлю нить дольше в спящем режиме. то есть Thread.sleep(1000)
*************2t as of 2019/10/14 17:29:58 ***********
*************3t as of 2019/10/14 17:29:58 ***********
*************4t as of 2019/10/14 17:29:58 ***********
*************6t as of 2019/10/14 17:29:58 ***********
After first sleep
Это ответ, если я позволю потоку спать в течение более короткого периода времени, то есть Thread.sleep(200)
*************2t as of 2019/10/14 17:31:53 ***********
*************4t as of 2019/10/14 17:31:53 ***********
*************6t as of 2019/10/14 17:31:53 ***********
*************3t as of 2019/10/14 17:31:53 ***********
After first sleep
*************1t as of 2019/10/14 17:31:53 ***********
2t3t6t4t1t