Простое распараллеливание для hashset - PullRequest
1 голос
/ 27 апреля 2020

У меня есть 2 цикла (вложенных), пытающихся выполнить простое распараллеливание

псевдокод :

for item1 in data1 (~100 million row)
    for item2 in data2 (~100 rows)
        result = process(item1,item2) // couple of if conditions
        hashset.add(result) // while adding, incase of a duplicate i also decide wihch one to retain

process(item1,item2), если быть точным, имеет 4 условия основывается на значениях в элементах item1 и item2 (время выполнения составляет менее 50 мс)

data1 размер - Nx17
data2 размер - Nx17
result размер - 1x17 (результат объединен в строку, прежде чем она будет добавлена ​​в хэш-набор)

максимальный размер вывода: неизвестно, но я хотел бы быть готовым по крайней мере 500 миллионов, что означает, что хэш-сет будет содержать 500 миллионов элементов. (как обращаться с таким большим количеством данных в хэш-наборе, это другой вопрос, который я предполагаю)

Должен ли я просто использовать concurrent hashset, чтобы сделать его безопасным для потоков, и go с parallel.each или i go с TASK концепцией

Пожалуйста, предоставьте несколько примеров кода на основе вашего мнения.

Ответы [ 2 ]

3 голосов
/ 27 апреля 2020

Ответ во многом зависит от стоимости process(data1, data2). Если это операция с интенсивным использованием процессора, то вы, безусловно, можете воспользоваться Parallel.ForEach. Конечно, вы должны использовать параллельный словарь или блокировать таблицу ha sh. Вы должны сравнить, чтобы увидеть, что работает лучше для вас. Если process слишком мало влияет на производительность, то, вероятно, вы ничего не получите от распараллеливания - блокировка на хеш-таблице убьет все это.

Вы также должны попытаться увидеть, перечисляет ли data2 на внешнем l oop также быстрее. Это может дать вам еще одно преимущество - вы можете иметь отдельную хеш-таблицу для каждого экземпляра данных2, а затем объединить результаты в одну хеш-таблицу. Это позволит избежать блокировок.

Опять же, вам нужно выполнить свои тесты, здесь нет универсального ответа.

1 голос
/ 27 апреля 2020

Мое предложение состоит в том, чтобы отделить обработку данных от сохранения результатов в HashSet, потому что первое является распараллеливаемым, а второе - нет. Вы можете достичь этого разделения с помощью шаблона производитель-потребитель, используя BlockingCollection и потоки (или задачи). Но я покажу решение с использованием более специализированного инструмента, библиотеки TPL Dataflow . Я предполагаю, что данные представляют собой два массива целых чисел, и функция обработки может выдавать до 500 000 000 различных результатов:

var data1 = Enumerable.Range(1, 100_000_000).ToArray();
var data2 = Enumerable.Range(1, 100).ToArray();

static int Process(int item1, int item2)
{
    return unchecked(item1 * item2) % 500_000_000;
}

В конвейере потока данных будет два блока. Первый блок - это TransformBlock, который принимает элемент из массива data1, обрабатывает его со всеми элементами массива data2 и возвращает пакет результатов (как int). массив).

var processBlock = new TransformBlock<int, int[]>(item1 =>
{
    int[] batch = new int[data2.Length];
    for (int j = 0; j < data2.Length; j++)
    {
        batch[j] = Process(item1, data2[j]);
    }
    return batch;
}, new ExecutionDataflowBlockOptions()
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 3 // Configurable
});

Второй блок - это и ActionBlock, который получает обработанные партии из первого блока и добавляет отдельные результаты в HashSet.

var results = new HashSet<int>();
var saveBlock = new ActionBlock<int[]>(batch =>
{
    for (int i = 0; i < batch.Length; i++)
    {
        results.Add(batch[i]);
    }
}, new ExecutionDataflowBlockOptions()
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 1 // Mandatory
});

Строка ниже связывает два блока вместе, так что данные будут автоматически перетекать из первого блока во второй:

processBlock.LinkTo(saveBlock,
    new DataflowLinkOptions() { PropagateCompletion = true });

Последний шаг заключается в подаче первого блока с помощью элементы массива data1 и дождитесь завершения всей операции.

for (int i = 0; i < data1.Length; i++)
{
    processBlock.SendAsync(data1[i]).Wait();
}
processBlock.Complete();
saveBlock.Completion.Wait();

HashSet содержит теперь результаты.

Примечание об использовании BoundedCapacity вариант. Эта опция управляет потоком данных, так что быстрый блок в восходящем направлении не будет заполнен данными, а медленный блок в обратном направлении. Правильная настройка этого параметра увеличивает эффективность использования памяти и ЦП конвейера.

Библиотека потоков данных TPL встроена в ядро. NET и доступна в виде пакета для. NET Framework.

...