У меня есть набор работников, которым я хотел бы обработать некоторые данные, чтобы данные были добавлены в очередь (блок буфера). После того, как работник обработает данные, я бы хотел, чтобы рабочий элемент вернулся в очередь. Я написал программу ниже, но кажется, что рабочие не собирают данные равномерно, какие-либо предложения о том, как это исправить? Правильно ли я использую этот шаблон потоков?
например. в выводе вижу
работник 1, работа 66, numjobs 32651, сумма 2154966
работник 0, работа 16,
numjobs 32637, сумма 522192
работник 1, работа 61, numjobs 32675, сумма
1993175
работник 0, работа 72, numjobs 32649, сумма 2350728
работник
1, работа 95, numjobs 32688, сумма 3105360
работник 0, работа 86, numjobs
32663, сумма 2809018
работник 1, работа 0, numjobs 32649, сумма 0
<- это не ожидалось <br>работник 0, работа 98, numjobs 32673, сумма
3201954
работник 1, работа 74, numjobs 32649, сумма 2416026
работник
0, работа 93, numjobs 32675, сумма 3038775
работник 1, работа 7, numjobs
32702, сумма 228914
работник 0, работа 42, numjobs 32642, сумма
1370964
работник 1, работа 32, numjobs 32708, сумма 1046656
работник
0, работа 99, numjobs 32693, сумма 3236607
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
using System.Collections.Generic;
namespace testworkers
{
class WorkerArguments
{
public int i;
public int sum;
public int numjobs;
public WorkerArguments()
{
}
}
class Program
{
static void Main(string[] args)
{
var cancelSignal = new CancellationTokenSource();
var buffer = new BufferBlock<WorkerArguments>(new DataflowBlockOptions() { BoundedCapacity = 5000, CancellationToken = cancelSignal.Token });
List<string> splunkProdUsers = new List<string>(new string[] { "worker1", "worker2", "worker3", "worker4" });
List<Task<int>> consumers = new List<Task<int>>();
for (int i = 0; i < splunkProdUsers.Count; i++)
{
var consumer = Worker(i, buffer, buffer, cancelSignal.Token);
consumers.Add(consumer);
}
for (int j = 0; j < 100; j++)
{
buffer.Post(new WorkerArguments { i = j });
}
for (int i = 0; i < consumers.Count; i++)
{
consumers[i].Wait(cancelSignal.Token);
}
}
static async Task<int> Worker(int workerId, ISourceBlock<WorkerArguments> source, ITargetBlock<WorkerArguments> target, CancellationToken cancelToken)
{
int workProcessed = 0;
while (await source.OutputAvailableAsync())
{
WorkerArguments workArguments = source.Receive(cancelToken);
Console.WriteLine("worker {0}, work {1}, numjobs {2}, sum {3}", workerId, workArguments.i, workArguments.numjobs, workArguments.sum);
WorkerArguments nextWorkArguments = new WorkerArguments();
nextWorkArguments.i = workArguments.i;
nextWorkArguments.sum = workArguments.sum + workArguments.i;
nextWorkArguments.numjobs = workArguments.numjobs + 1;
target.Post(nextWorkArguments);
}
return workProcessed;
}
}
}