Ну, я не уверен на 100%, что вы пытаетесь архивировать здесь. Вы пытаетесь просто снять все предметы, пока ничего не осталось? Или просто снять много предметов за один раз?
Первое, вероятно, неожиданное поведение начинается с этого утверждения:
theQueue.AsParallel()
Для ConcurrentQueue вы получаете 'Snapshot'-Enumerator. Поэтому, когда вы перебираете параллельный стек, вы перебираете только снимок, а не «живую» очередь.
В общем, я думаю, что не стоит перебирать то, что вы меняете во время итерации.
Таким образом, другое решение будет выглядеть так:
// this way it's more clear, that we only deque for theQueue.Count items
// However after this, the queue is probably not empty
// or maybe the queue is also empty earlier
Parallel.For(0, theQueue.Count,
new ParallelOptions() {MaxDegreeOfParallelism = 20},
() => {
theQueue.TryDequeue(); //and stuff
});
Это позволяет избежать манипуляций с чем-либо во время итерации по нему. Однако после этого оператора очередь все еще может содержать данные, которые были добавлены во время цикла for.
Чтобы освободить очередь на мгновение, вам, вероятно, потребуется немного больше работы. Вот действительно уродливое решение. Пока в очереди есть еще элементы, создайте новые задачи. Каждое начало задачи делает очередь из очереди как можно дольше. В конце мы ждем окончания всех задач. Чтобы ограничить параллелизм, мы никогда не создаем более 20 задач.
// Probably a kitty died because of this ugly code ;)
// However, this code tries to get the queue empty in a very aggressive way
Action consumeFromQueue = () =>
{
while (tt.TryDequeue())
{
; // do your stuff
}
};
var allRunningTasks = new Task[MaxParallism];
for(int i=0;i<MaxParallism && tt.Count>0;i++)
{
allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);
}
Task.WaitAll(allRunningTasks);