Да, я согласен, это действительно кажется странным. Сначала кажется, что ваш код работает без сбоев, но после нескольких тысяч элементов загрузка процессора возрастает до такой степени, что производительность действительно снижается. Это беспокоит меня и предлагает проблему в рамках. После игры с вашим кодом я не могу понять, почему это так. Я бы предложил перенести эту проблему на форумы Microsoft Robotics и посмотреть, сможете ли вы попросить Джорджа Хризантакопулоса (или один из других мозгов CCR) рассказать вам, в чем проблема. Однако я могу предположить, что ваш код в его нынешнем виде ужасно неэффективен.
То, как вы справляетесь с «выталкиванием» предметов из порта, очень неэффективно. По сути, итератор просыпается каждый раз, когда в порту появляется сообщение, и он обрабатывает только одно сообщение (несмотря на то, что в порту может быть еще несколько сотен), а затем зависает на yield
, пока управление передается обратно рамки. В тот момент, когда данный получатель вызывает другое «пробуждение» итератора, многие многие сообщения заполнили порт. Извлечение потока из Dispatcher для обработки только одного элемента (когда многие накопили за это время) почти наверняка не лучший способ получить хорошую пропускную способность.
Я изменил ваш код так, что после выхода мы проверяем порт, чтобы увидеть, есть ли еще какие-либо сообщения, поставленные в очередь, и имеем дело с ними, тем самым полностью опустошая порт, прежде чем мы вернемся к фреймворку. Я также немного изменил ваш код для использования CcrServiceBase
, что упрощает синтаксис некоторых задач, которые вы выполняете:
internal class Test:CcrServiceBase
{
private readonly Port<int> intPort = new Port<int>();
private Timer timer;
public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
new Dispatcher(0,
"dispatcher")))
{
}
public void StartTest() {
SpawnIterator(ProcessInts);
var counter = 0;
timer = new Timer(x =>
{
for (var i = 0; i < 1500; ++i)
intPort.Post(counter++);
}
,
null,
0,
1000);
}
public IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue = intPort;
ReportCurrent(currentValue);
while(intPort.Test(out currentValue))
{
ReportCurrent(currentValue);
}
}
}
private void ReportCurrent(int currentValue)
{
if (currentValue % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue,
intPort.ItemCount);
}
}
}
В качестве альтернативы, вы можете полностью отказаться от итератора, поскольку он не очень хорошо используется в вашем примере (хотя я не совсем уверен, как это повлияет на порядок обработки):
internal class Test : CcrServiceBase
{
private readonly Port<int> intPort = new Port<int>();
private Timer timer;
public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
new Dispatcher(0,
"dispatcher")))
{
}
public void StartTest()
{
Activate(
Arbiter.Receive(true,
intPort,
i =>
{
ReportCurrent(i);
int currentValue;
while (intPort.Test(out currentValue))
{
ReportCurrent(currentValue);
}
}));
var counter = 0;
timer = new Timer(x =>
{
for (var i = 0; i < 500000; ++i)
{
intPort.Post(counter++);
}
}
,
null,
0,
1000);
}
private void ReportCurrent(int currentValue)
{
if (currentValue % 1000000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue,
intPort.ItemCount);
}
}
}
Оба этих примера значительно увеличивают пропускную способность на порядки. Надеюсь, это поможет.