При использовании ConcurrentQueue попытка удаления из очереди при параллельном цикле - PullRequest
2 голосов
/ 08 июня 2010

Я использую параллельные структуры данных в моем приложении .NET 4, и у меня есть ConcurrentQueue, к которому добавляются во время обработки через него.

Я хочу сделать что-то вроде:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

когда я выполняю вызовы из базы данных для сохранения данных, я ограничиваю количество одновременных потоков.

Но я ожидаю, что ForAll не выйдет из очереди, и я беспокоюсь о том, чтобы просто сделать

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

, так как нет гарантии, что я выкину правильную.

Итак, как я могу выполнить итерацию по сбору и удалению из очереди, параллельно.

Или было бы лучше использовать PLINQ для параллельной обработки?

Ответы [ 2 ]

4 голосов
/ 10 июня 2010

Ну, я не уверен на 100%, что вы пытаетесь архивировать здесь. Вы пытаетесь просто снять все предметы, пока ничего не осталось? Или просто снять много предметов за один раз?

Первое, вероятно, неожиданное поведение начинается с этого утверждения:

 theQueue.AsParallel()

Для ConcurrentQueue вы получаете 'Snapshot'-Enumerator. Поэтому, когда вы перебираете параллельный стек, вы перебираете только снимок, а не «живую» очередь.

В общем, я думаю, что не стоит перебирать то, что вы меняете во время итерации.

Таким образом, другое решение будет выглядеть так:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

Это позволяет избежать манипуляций с чем-либо во время итерации по нему. Однако после этого оператора очередь все еще может содержать данные, которые были добавлены во время цикла for.

Чтобы освободить очередь на мгновение, вам, вероятно, потребуется немного больше работы. Вот действительно уродливое решение. Пока в очереди есть еще элементы, создайте новые задачи. Каждое начало задачи делает очередь из очереди как можно дольше. В конце мы ждем окончания всех задач. Чтобы ограничить параллелизм, мы никогда не создаем более 20 задач.

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
0 голосов
/ 27 августа 2010

Если вы стремитесь достичь высокого уровня по всему реальному сайту и вам не нужно делать немедленные обновления БД, вам будет гораздо лучше использовать очень консервативное решение, чем библиотеки дополнительных слоев.

Makeмассив фиксированного размера (предполагаемый размер - скажем, 1000 элементов или N секунд) и блокированный индекс, чтобы запросы просто помещали данные в слоты и возвращали их.Когда один блок будет заполнен (продолжайте проверять счетчик), создайте другой и вызовите асинхронный делегат spawn для обработки и отправки в SQL только что заполненный блок.В зависимости от структуры ваших данных делегируемый может упаковать все данные в массивы, разделенные запятыми, может быть, даже в простой XML-файл (конечно, нужно протестировать его) и отправить их в SQL sproc, который должен дать возможность обрабатывать их записипо записи - никогда не держит большой замок.Если он становится тяжелым, вы можете разбить свой блок на несколько меньших блоков.Ключевым моментом является то, что вы минимизировали количество запросов к SQL, всегда сохраняли одну степень разделения и даже не должны были платить цену за пул потоков - вам, вероятно, не нужно будет использовать более двух асинхронных потоков вообще.

Это будет намного быстрее, чем возиться с Parallel-s.

...