Исполнитель задачи Spring Integration умирает без предупреждения примерно через 1000 мс во время тестирования - PullRequest
0 голосов
/ 14 октября 2019

Я использую Spring Integration для создания следующего потока: Входной канал -> Разветвитель -> Трансформатор -> Активатор службы -> Агрегатор

Трансформатор и Активатор службы связаны и выполняются с помощью задачи-исполнителя. Во время исполнения приложения проблем нет. Но когда я пытаюсь запустить модульные тесты, поток-исполнитель, выполняющий Service Activator, таинственным образом завершает свою работу, если есть долго выполняемая задача. Чтобы продемонстрировать это, я создал пример проекта со следующей конфигурацией:

<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
    channel="helloWorldChannel"/>    
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
    <bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
    <int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>

<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
    <int:transformer>
        <bean id="stringTransformer" class="hello.Transformer"></bean>
    </int:transformer>
    <int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
    <bean class="hello.ResponseAggregator"/>
</int:aggregator>

Это класс Splitter:

public class Splitter {

public List<String> splitMessage(Message message)  {
    String msg = message.getPayload().toString();
    return Arrays.asList(msg.split(","));
}

}

Это класс Transformer:

public class Transformer {

public String transform(Message message)  {
    String msg = message.getPayload().toString();
    return msg+"t";
}

}

Это класс Service Activator:


@Service
public class HelloWorldAmqService {

    public Message processMsg(String msg) throws InterruptedException {
        DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Date date = new Date();
        //Simulate long running process
        if(msg.equalsIgnoreCase("1t")){
            Thread.sleep(500);
            System.out.println("After first sleep");
            Thread.sleep(800);
        }
        System.out.println("*************"+ msg + " as of "+sdf.format(date)+" ***********  " );
        return MessageBuilder.withPayload(msg).build();

    }
}

Чтобы смоделировать долгосрочное задание, я добавил Thread.sleep() в методе processMsg.

Это класс ResponseAggregator:

@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {

    @Override
    protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        StringBuilder builder = new StringBuilder();
        for (Message message : group.getMessages()) {
            builder.append(message.getPayload());
        }
        System.out.println(builder.toString());
        return MessageBuilder.withPayload(builder.toString()).build();
    }
}

Я написал модульный тест, чтобы отправить пример сообщения на канал и проверить поведение. Но всякий раз, когда поток Service Activator обрабатывает более 1000 мс, он завершает работу без предупреждения. Вот модульный тест

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {

    @Autowired
    private MessageChannel helloWorldChannel;

    @Autowired
    private HelloWorldAmqService hello;

    @Autowired
    @Qualifier("jmsConnectionFactory")
    ConnectionFactory jmsConnectionFactory;

    @Test
    public void test() {

        helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
        assertThat(true);
    }

}

Обычно можно ожидать, что выход будет иметь агрегированные результаты, то есть 1t2t3t4t6t (порядок может быть другим). Но приложение не достигает агрегатора вообще. Поток, отвечающий за обработку «1t», завершает работу, и агрегатор вообще не запускается, поскольку поведение по умолчанию заключается в ожидании получения всех сообщений.

Это ответ, если я оставлю нить дольше в спящем режиме. то есть Thread.sleep(1000)

*************2t as of 2019/10/14 17:29:58 ***********  
*************3t as of 2019/10/14 17:29:58 ***********  
*************4t as of 2019/10/14 17:29:58 ***********  
*************6t as of 2019/10/14 17:29:58 ***********  


After first sleep

Это ответ, если я позволю потоку спать в течение более короткого периода времени, то есть Thread.sleep(200)

*************2t as of 2019/10/14 17:31:53 ***********  
*************4t as of 2019/10/14 17:31:53 ***********  
*************6t as of 2019/10/14 17:31:53 ***********  
*************3t as of 2019/10/14 17:31:53 ***********  


After first sleep
*************1t as of 2019/10/14 17:31:53 ***********  
2t3t6t4t1t

1 Ответ

0 голосов
/ 14 октября 2019

Ваша проблема в том, что вам не хватает факта, что ваше приложение является async, но ваш тест не имеет ничего общего с многопоточным решением.

Вы отправляете сообщение в тестовом методе и ничего не делаете, чтобы дождаться выходного сообщения. Поэтому основной поток, который инициировал выполнение теста, просто существует, оставляя позади все выполнение в других потоках.

Ваша идея с отправкой на helloWorldChannel вместо работы с JMS-адресатом является хорошим выбором. Только проблема в том, что вы не ждете результата потока после агрегации.

Странно также выводить конечную точку в errorChannel, но вы можете подписаться на нее из своего тестового примера перед выдачей сообщения:

@Autowired
private SubscribableChannel errorChannel;


@Test
public void test() {
    SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();
    this.errorChannel.subscribe((message) -> messageFuture.set(message));
    helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
    Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);
    ...
}

Этонезависимо от поведения потока, ваш основной поток JUnit будет ожидать результата в этом Future.

...