Как создать собственный агрегатор в Mule? - PullRequest
1 голос
/ 01 октября 2011

Каков рекомендуемый способ создания полностью собственного агрегатора в mule 3.x? Под полностью индивидуальным подходом я подразумеваю согласно своей собственной логике, не используя идентификаторы корреляции, количество сообщений и т. Д.

Документация на сайте mulesoft устарела, говоря о том, что используется AbstractEventAggregator, которого нет в 3.x:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

Копаем глубже, похоже, этот класс был переименован в AbstractAggregator в 3.x:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/routing/AbstractAggregator.html

Однако нет примеров, показывающих, как это использовать. Пример LoanBroker, описанный в первой ссылке выше, на самом деле использует агрегатор корреляции (в примерах 2.x я предполагаю, что документ имеет в виду).

В какой-то момент существовал абстрактный класс, у которого были абстрактные методы shouldAggregate и doAggregate. Это тот класс, который я хотел бы расширить.

1 Ответ

4 голосов
/ 01 октября 2011

Посмотрите на TestAggregator ниже пример подкласса AbstractAggregator.

import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;

import java.util.Iterator;

public class TestAggregator extends AbstractAggregator
{
    @Override
    protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
    {
        return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
        {
            @Override
            public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
            {
                StringBuffer buffer = new StringBuffer(128);

                try
                {
                    for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
                    {
                        MuleEvent event = iterator.next();
                        try
                        {
                            buffer.append(event.transformMessageToString());
                        }
                        catch (TransformerException e)
                        {
                            throw new AggregationException(events, null, e);
                        }
                    }
                }
                catch (ObjectStoreException e)
                {
                    throw new AggregationException(events,null,e);
                }

                logger.debug("event payload is: " + buffer.toString());
                return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
            }
        };
    }
}
...