Мое предложение состоит в том, чтобы отделить обработку данных от сохранения результатов в HashSet
, потому что первое является распараллеливаемым, а второе - нет. Вы можете достичь этого разделения с помощью шаблона производитель-потребитель, используя BlockingCollection
и потоки (или задачи). Но я покажу решение с использованием более специализированного инструмента, библиотеки TPL Dataflow . Я предполагаю, что данные представляют собой два массива целых чисел, и функция обработки может выдавать до 500 000 000 различных результатов:
var data1 = Enumerable.Range(1, 100_000_000).ToArray();
var data2 = Enumerable.Range(1, 100).ToArray();
static int Process(int item1, int item2)
{
return unchecked(item1 * item2) % 500_000_000;
}
В конвейере потока данных будет два блока. Первый блок - это TransformBlock
, который принимает элемент из массива data1
, обрабатывает его со всеми элементами массива data2
и возвращает пакет результатов (как int
). массив).
var processBlock = new TransformBlock<int, int[]>(item1 =>
{
int[] batch = new int[data2.Length];
for (int j = 0; j < data2.Length; j++)
{
batch[j] = Process(item1, data2[j]);
}
return batch;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 3 // Configurable
});
Второй блок - это и ActionBlock
, который получает обработанные партии из первого блока и добавляет отдельные результаты в HashSet
.
var results = new HashSet<int>();
var saveBlock = new ActionBlock<int[]>(batch =>
{
for (int i = 0; i < batch.Length; i++)
{
results.Add(batch[i]);
}
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 1 // Mandatory
});
Строка ниже связывает два блока вместе, так что данные будут автоматически перетекать из первого блока во второй:
processBlock.LinkTo(saveBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
Последний шаг заключается в подаче первого блока с помощью элементы массива data1
и дождитесь завершения всей операции.
for (int i = 0; i < data1.Length; i++)
{
processBlock.SendAsync(data1[i]).Wait();
}
processBlock.Complete();
saveBlock.Completion.Wait();
HashSet
содержит теперь результаты.
Примечание об использовании BoundedCapacity
вариант. Эта опция управляет потоком данных, так что быстрый блок в восходящем направлении не будет заполнен данными, а медленный блок в обратном направлении. Правильная настройка этого параметра увеличивает эффективность использования памяти и ЦП конвейера.
Библиотека потоков данных TPL встроена в ядро. NET и доступна в виде пакета для. NET Framework.