Что такое эффективный метод для упорядоченной обработки событий с использованием CCR? - PullRequest
2 голосов
/ 23 марта 2011

Я экспериментировал с итераторами CCR в качестве решения задачи, которая требует параллельной обработки множества каналов данных, где данные из каждого канала должны обрабатываться по порядку.Ни один из каналов не зависит друг от друга, поэтому обработка заказов может быть параллельной для каждого канала.

Ниже приведен быстрый и грязный макет с одним целочисленным потоком, который просто помещает целые числа в порт наскорость около 1,5 К / с, а затем вытаскивает их с помощью итератора CCR, чтобы сохранить гарантию обработки заказов.

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

Что меня очень удивило в этом, так это то, что CCR не мог не отставать отКоробка Corei7, размер очереди растет без границ.В другом тесте для измерения задержки между Post () и Receive () под нагрузкой или ~ 100 Post / sec., Задержка между первым Post () и Receive () в каждом пакете была около 1 мс.

Что-то не так с моим макетом?Если это так, что является лучшим способом сделать это с помощью CCR?

1 Ответ

1 голос
/ 07 апреля 2011

Да, я согласен, это действительно кажется странным. Сначала кажется, что ваш код работает без сбоев, но после нескольких тысяч элементов загрузка процессора возрастает до такой степени, что производительность действительно снижается. Это беспокоит меня и предлагает проблему в рамках. После игры с вашим кодом я не могу понять, почему это так. Я бы предложил перенести эту проблему на форумы Microsoft Robotics и посмотреть, сможете ли вы попросить Джорджа Хризантакопулоса (или один из других мозгов CCR) рассказать вам, в чем проблема. Однако я могу предположить, что ваш код в его нынешнем виде ужасно неэффективен.

То, как вы справляетесь с «выталкиванием» предметов из порта, очень неэффективно. По сути, итератор просыпается каждый раз, когда в порту появляется сообщение, и он обрабатывает только одно сообщение (несмотря на то, что в порту может быть еще несколько сотен), а затем зависает на yield, пока управление передается обратно рамки. В тот момент, когда данный получатель вызывает другое «пробуждение» итератора, многие многие сообщения заполнили порт. Извлечение потока из Dispatcher для обработки только одного элемента (когда многие накопили за это время) почти наверняка не лучший способ получить хорошую пропускную способность.

Я изменил ваш код так, что после выхода мы проверяем порт, чтобы увидеть, есть ли еще какие-либо сообщения, поставленные в очередь, и имеем дело с ними, тем самым полностью опустошая порт, прежде чем мы вернемся к фреймворку. Я также немного изменил ваш код для использования CcrServiceBase, что упрощает синтаксис некоторых задач, которые вы выполняете:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

В качестве альтернативы, вы можете полностью отказаться от итератора, поскольку он не очень хорошо используется в вашем примере (хотя я не совсем уверен, как это повлияет на порядок обработки):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Оба этих примера значительно увеличивают пропускную способность на порядки. Надеюсь, это поможет.

...