Ожидание завершения всех потоков в Spring Integration - PullRequest
15 голосов
/ 19 января 2012

У меня есть исполняемая jar-программа, которая сильно зависит от Spring Integration.Проблема, с которой я сталкиваюсь, заключается в том, что программа завершает работу до того, как другие компоненты Spring полностью завершат .

Ниже приведена сокращенная версия кода, который я использую.больше кода / конфигурации, если это необходимо.Точкой входа является метод main (), который загружает Spring и запускает процесс импорта:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

DataImporter содержит простой цикл, который отправляет сообщения в шлюз Spring Integration.Это обеспечивает активный «толкающий» подход к потоку, а не общий подход к опросу данных.Вот где возникает моя проблема:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

Для полноты, поток XML выглядит примерно так:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

Поток запускается и обрабатывает элементы эффективно, но после startImport () цикл завершает работу основного потока и немедленно завершает все потоки Spring Integration.Это приводит к состоянию гонки, последние (n) элементы не полностью обрабатываются при завершении программы.

У меня есть идея поддерживать счетчик ссылок на элементы, которые я обрабатываю, но это подтверждаетсядовольно сложный, поскольку поток часто разделяет / маршрутизирует сообщения нескольким активаторам службы, что означает, что трудно определить, «закончен» ли каждый элемент.

Мне кажется, что мне нужен какой-то способ либо проверить, что бины Spring по-прежнему не выполняются, либо отметить, что все элементы, отправленные на шлюз, были полностью обработаны перед завершением.

У меня вопрос: как мне поступить так или иначе, или есть лучший подход к моей проблеме, о котором я не думал?

Ответы [ 2 ]

10 голосов
/ 19 января 2012

Вы не используете здесь шаблон запроса-ответа.

outbound-channel-adapter - это действие «забей и забудь». Если вы хотите дождаться ответа, вы должны использовать исходящий шлюз, который будет ждать ответа, и подключить ответ к исходному шлюзу, тогда в java sendAndReceive not просто опубликуйте.

3 голосов
/ 19 января 2012

Если вы можете получить Item, чтобы определить, по-прежнему ли это нужно или нет (processingFinished () или что-то подобное, выполненное на внутренних этапах), вы можете зарегистрировать все Item в центральном органе , который отслеживает количество незавершенных Item с и эффективно определяет условие завершения.

Если такой подход возможен, вы можете даже подумать об упаковке предметов в FutureTask -объекты или использовать аналогичные понятия из java.util.concurrent.

Редактировать: Вторая идея:

Задумывались ли вы о том, чтобы сделать каналы более интеллектуальными? Отправитель закрывает канал, если больше не отправляет данные. В этом сценарии рабочие компоненты не обязательно должны быть потоками deamon, но могут определять критерий завершения на основе закрытого и пустого входного канала.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...