Как оптимизировать слияние элементов параллельно - PullRequest
0 голосов
/ 03 июля 2019

У нас есть следующая проблема.Мы разбираем файлы (производитель) и конвертируем данные в формат ac # data.После этого нам нужно объединить все эти данные вместе.Поскольку это может быть сделано параллельно, мы начинаем реализовывать шаблон потребителя производителя, но немного застряли в том, как оптимизировать результаты объединения.

Производитель производит 5 элементов данных (названных следующим образом):

1, 2, 3, 4, 5

Слияние, которое будет выполнено, но порядок не имеет значения.Как только будут созданы 2 элемента, они могут быть объединены.

Пример:

(1) и (2), (3) и (4), (12) и (34)), (1234) и (5)

Data data = new Data();
BlockingCollection<Data> collection = new BlockingCollection<Data>();
Task consumer = Task.Factory.StartNew(() =>
{
    while (!collection.IsCompleted)
    {
        var item = collection.Take();
        data.Merge(item);
    }
});
Task producer = Task.Factory.StartNew(() =>
{
    Parallel.ForEach(files, file =>
    {
        collection.Add(new Data(file));
    });
    collection.CompleteAdding();
});
Task.WaitAll(consumer, producer);
//here we got the data merged with all files
return data;

Этот код работает, но имеет проблему.В нашем случае производитель намного быстрее, чем потребитель.Таким образом, нам нужны параллельные потребители, которые ждут двух элементов в очереди.Затем они должны взять их, объединить их и вернуть в очередь.Существует ли какой-либо известный шаблон для такой проблемы слияния?

1 Ответ

0 голосов
/ 03 июля 2019

Мы нашли для этого неплохое решение.

Data data = new Data();
BlockingCollection<Data> collection = new BlockingCollection<Data>();
List<Task<Data>> tasks = new List<Task<Data>>();
Enumerable.Range(0, 5).ForEach(t => {
    Task consumer = Task.Factory.StartNew(() =>
    {
        Data result = null;
        foreach (Data data in collection.GetConsumingEnumerable())
        {
            if (result == null)
            {
                result = data;
            }
            else
            {
                result.Merge(data);
            }
        }
        return result;
    });
    tasks.Add(consumer);
});
Task producer = Task.Factory.StartNew(() =>
{
    Parallel.ForEach(files, file =>
    {
        collection.Add(new Data(file));
    });
    collection.CompleteAdding();
});
Task.WaitAll(consumers.Concat(new []{ producer}));
List<Data> datas = consumers.Select(t => t.Result).Where(t => t != null).ToList();
Data finalResult = datas.First();
foreach (Data toBeMerged in datas.Skip(1))
{
    finalResult.Merge(toBeMerged);
}
return finalResult;
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...