Spring Integration: Разделить сообщение снова после использования агрегата? - PullRequest
4 голосов
/ 05 августа 2011

В моем проекте на базе Spring Integration у меня есть сплиттер и полезный маршрутизатор для отправки моих данных на различные преобразователи. Новые «преобразованные» объекты затем передаются обратно в агрегатор и обрабатываются.

Теперь я хочу разделить мои агрегированные результаты, чтобы они были сохранены должным образом, поскольку мне нужно направить некоторые объекты в отдельный адаптер исходящего канала. Чтобы добиться этого, я добавил второй сплиттер после моего агрегатора; но, похоже, только первый элемент в агрегированном наборе передается на маршрутизатор.

Это мой текущий поток:

<splitter ref="articleContentExtractor" />

<!-- This router works exactly as expected -->
<payload-type-router>
    ... routing to various transformers ...
    ... results are sent to articleOutAggregateChannel ...
</payload-type-router>

<aggregator ref="articleAggregator" />

<splitter />

<!-- This is where it seems to go wrong, the second
        splitter returns only the first object in the collection -->    
<payload-type-router resolution-required="true">
    <mapping type="x.y.z.AbstractContent" channel="contentOutChannel" />
    <mapping type="x.y.z.Staff" channel="staffOutChannel" />
</payload-type-router>

<outbound-channel-adapter id="contentSaveService" ref="contentExporter" 
    method="persist" channel="contentOutChannel" />

<outbound-channel-adapter id="staffSaveService" ref="staffExporter" 
    method="persist" channel="staffOutChannel" />

И мой код Агрегатора:

@Aggregator
public List<? super BaseObject> compileArticle(List<? super BaseObject> parts) {

    // Search for the required objects for referencing
    Iterator<? super BaseObject> it = parts.iterator();
    Article article = null;
    List<Staff> authors = new ArrayList<Staff>();

    while (it.hasNext()) {
        Object part = it.next();
        if (part instanceof Article) {
            article = (Article)part;
        }
        else if (part instanceof Staff) {
            authors.add((Staff)part);
        }
    }

    // Apply references
    article.setAuthors(authors);

    return parts;
}

Что я делаю не так? Правильно ли я использую свой агрегатор?

Примечание. Если я просто удалю и агрегатор, и второй сплиттер, остальная часть потока будет работать идеально.

1 Ответ

1 голос
/ 18 августа 2011

Трудно рассказать обо всем, что происходит, потому что у меня нет всего вашего кода, но я смог заставить этот поток работать так, как я думаю, вы хотели. BaseObjectTransformer ничего не делает, только устанавливает флаг для объектов, проходящих через него:

...
<context:component-scan base-package="net.grogscave.example" />

<channel id="inputChannel" />

<splitter ref="articleContentExtractor" input-channel="inputChannel"
    output-channel="splitArticleStaff" />

<channel id="splitArticleStaff" />

<payload-type-router input-channel="splitArticleStaff">
    <mapping type="net.grogscave.example.domain.Article" channel="articleChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffChannel" />
</payload-type-router>

<channel id="articleChannel" />

<transformer input-channel="articleChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="staffChannel" />

<transformer input-channel="staffChannel" output-channel="articleOutAggregateChannel"
    ref="baseObjectTransformer"/>

<channel id="articleOutAggregateChannel" />

<aggregator ref="articleAggregator" input-channel="articleOutAggregateChannel" output-channel="splitArticleInChannel"/>

<channel id="splitArticleInChannel" />

<splitter input-channel="splitArticleInChannel" output-channel="splitArticleOutChannel" />

<channel id="splitArticleOutChannel" />

<payload-type-router resolution-required="true" input-channel="splitArticleOutChannel">
    <mapping type="net.grogscave.example.domain.Article" channel="contentOutChannel" />
    <mapping type="net.grogscave.example.domain.Staff" channel="staffOutChannel" />
</payload-type-router>

<channel id="contentOutChannel">
    <queue capacity="10" />
</channel>

<channel id="staffOutChannel">
    <queue capacity="10" />
</channel>
...

Затем я просто выполняю этот поток со следующим кодом в тестовом примере:

    ...
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
            "/META-INF/spring/split-agg-split.xml",
            SplitAggSplitTests.class);

    MessageChannel inputChannel = context.getBean("inputChannel",
            MessageChannel.class);

    PollableChannel contentOutChannel = context.getBean("contentOutChannel",
            PollableChannel.class);

    PollableChannel staffOutChannel = context.getBean("staffOutChannel",
            PollableChannel.class);

    inputChannel.send(new GenericMessage<String>("Dewey Wins!,A. Fool, C. Lewlis"));

    logger.info("==> Article recieved: "
            + contentOutChannel.receive(0).getPayload());

    for(int i = 0; i < 2; i++)
    {
        logger.info("==> Staff recieved: "
                + staffOutChannel.receive(0).getPayload());
    }
    ...

Мой вывод журнала производит все три объекта.

С учетом всего сказанного, вы рассматривали возможность простого удаления дополнительного сплиттера / агрегатора и простого перемещения логики setAuthors в свой первый сплиттер? Я не знаю всех деталей вашего потока, но это может упростить ситуацию.

...