Потеря элементов где-нибудь в C # BlockingCollection с GetConsumingEnumerable () - PullRequest
3 голосов
/ 21 октября 2019

Я пытаюсь выполнить параллельную SqlBulkCopy для нескольких целей по глобальной сети, многие из которых могут иметь медленные соединения и / или прерывания соединения;скорость их соединения варьируется от 2 до 50 мегабит, и я посылаю соединение с 1000 мбит;многим целям требуется несколько повторных попыток для правильного завершения.

В настоящее время я использую Parallel.ForEach для GetConsumingEnumerable() из BlockingCollection (queue);однако я либо наткнулся на какую-то ошибку, либо у меня возникли проблемы с полным пониманием ее цели, либо я просто что-то неправильно понял. Код никогда не вызывает метод CompleteAdding() коллекции блокировки, кажется, что где-то в цикле параллельного foreach некоторыеиз целей теряются. Даже если существуют разные подходы к этому, и независимо от того, какую работу он выполняет в цикле, блокирующая коллекция не должна вести себя так, как в этом примере, не так ли?

В цикле foreach, Я делаю работу и добавляю цель в results -коллекцию, если она успешно выполнена, или повторно добавляю цель в BlockingCollection в случае ошибки, пока цель не достигнет максимального порога повторных попыток;в этот момент я добавляю его в коллекцию results.

В дополнительной задаче я зацикливаюсь, пока счет коллекции results не станет равным начальному количеству целей;затем я делаю CompleteAdding() для коллекции блокировок.

Я уже пытался использовать объект блокировки для операций с коллекцией results (вместо этого используется List<int>) и очереди, но безуспешно, но это не должно быть необходимо в любом случае. Я также попытался добавить повторы в отдельную коллекцию и повторно добавить их в коллекцию BlockingCollection в другой задаче, а не в параллельный .foreach. Просто для удовольствия я также попытался скомпилировать с .NET с 4.5 до 4.8 и различные версии языка C #.

Вот упрощенный пример:

List<int> targets = new List<int>();
for (int i = 0; i < 200; i++)
{
    targets.Add(0);
}

BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
ConcurrentBag<int> results = new ConcurrentBag<int>();
targets.ForEach(f => queue.Add(f));

// Bulkcopy in die Filialen:
Task.Run(() =>
    {
        while (results.Count < targets.Count)
        {
            Thread.Sleep(2000);
            Console.WriteLine($"Completed: {results.Count} / {targets.Count} | queue: {queue.Count}");
        }
        queue.CompleteAdding();
    });

int MAX_RETRIES = 10;
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 50 };

Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
    {
        try
        {
            // simulate a problem with the bulkcopy:
            throw new Exception();
            results.Add(target);
        }
        catch (Exception)
        {
            if (target < MAX_RETRIES)
            {
                target++;
                if (!queue.TryAdd(target))
                    Console.WriteLine($"{target.ToString("D3")}: Error, can't add to queue!");
            }
            else
            {
                results.Add(target);
                Console.WriteLine($"Aborted after {target + 1} tries | {results.Count} / {targets.Count} items finished.");
            }

        }
    });

Я ожидал, что счетчик results -collection является точным счетом targets -листов в конце, но, похоже, он никогда не достигнет этого числа, в результате чего BlockingCollection никогда не будет помечен как завершенный, поэтому код никогда не завершится.

Я действительно не понимаю, почему не все цели в конечном итоге добавляются в коллекцию results! Добавленное количество всегда изменяется и в основном просто не соответствует ожидаемому окончательному числу.

РЕДАКТИРОВАТЬ: я удалил повторную часть и заменил ConcurrentBag простым int-counter, и он все еще не работаетбольшую часть времени:

List<int> targets = new List<int>();
for (int i = 0; i < 500; i++)
    targets.Add(0);

BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
//ConcurrentBag<int> results = new ConcurrentBag<int>();
int completed = 0;
targets.ForEach(f => queue.Add(f));

var thread = new Thread(() =>
{
    while (completed < targets.Count)
    {
        Thread.Sleep(2000);
        Console.WriteLine($"Completed: {completed} / {targets.Count} | queue: {queue.Count}");
    }
    queue.CompleteAdding();
});
thread.Start();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
    Interlocked.Increment(ref completed);
});

Ответы [ 2 ]

1 голос
/ 21 октября 2019

Parallel.ForEach предназначен для параллелизма данных (т. Е. Обработки строк по 100 КБ с использованием всех 8 ядер), а не одновременных операций. По сути, это проблема pub / sub и async , если не проблема конвейера. В этом случае CPU ничего не может сделать, просто запустите асинхронные операции и дождитесь их завершения.

.NET обрабатывает это, начиная с .NET 4.5 через классы Dataflow, а в последнее время - пространство имен нижнего уровня System.Threading.Channel.

В простейшей форме вы можете создать ActionBlock <> , который принимает буфер и целевое соединение и публикует данные. Допустим, вы используете этот метод для отправки данных на сервер:

async Task MyBulkCopyMethod(string connectionString,DataTable data)
{
    using(var bcp=new SqlBulkCopy(connectionString))
    {
        //Set up mappings etc.
        //....
        await bcp.WriteToServerAsync(data);   
    }
}

Вы можете использовать это с классом ActionBlock с настроенной степенью параллелизма. Классы потока данных, такие как ActionBlock, имеют свои собственные входные и, при необходимости, выходные буферы, поэтому нет необходимости создавать отдельную очередь:

class DataMessage
{
    public string Connection{get;set;}
    public DataTable Data {get;set;} 
}

...

var options=new ExecutionDataflowBlockOptions { 
                    MaxDegreeOfParallelism = 50,
                    BoundedCapacity = 8
            };
var block=new ActionBlock<DataMessage>(msg=>MyBulkCopyMethod(msg.Connection,msg.Data, options);

Мы можем начатьотправка сообщений в блок сейчас. Установив емкость на 8, мы гарантируем, что входной буфер не будет заполняться большими сообщениями, если блок слишком медленный. MaxDegreeOfParallelism контролирует, как могут выполняться операции одновременно. Допустим, мы хотим отправить одни и те же данные на многие серверы:

var data=.....;
var servers=new[]{connString1, connString2,....};
var messages= from sv in servers
              select new DataMessage{ ConnectionString=sv,Table=data};

foreach(var msg in messages)
{
    await block.SendAsync(msg);
}
//Tell the block we are done
block.Complete();
//Await for all messages to finish processing
await block.Completion;

Retries

Одна из возможностей повторных попыток - использовать цикл повторения в рабочей функции. Лучше было бы использовать другой блок и публиковать там сообщения о сбоях.

var block=new ActionBlock<DataMessage>(async msg=> {
    try {
        await MyBulkCopyMethod(msg.Connection,msg.Data, options);
    }
    catch(SqlException exc) when (some retry condition)
    {
        //Post without awaiting
        retryBlock.Post(msg);
    });

Когда исходный блок завершится, мы хотим сказать, чтобы блок повторов также завершился, независимо от того, что:

block.Completion.ContinueWith(_=>retryBlock.Complete());

Теперь мы можем дождаться завершения retryBlock.

Этот блок может иметь меньший DOP и, возможно, задержку между попытками:

var retryOptions=new ExecutionDataflowBlockOptions { 
                MaxDegreeOfParallelism = 5
        };
var retryBlock=new ActionBlock<DataMessage>(async msg=>{
    await Task.Delay(1000);
    try {
        await MyBulkCopyMethod(msg.Connection,msg.Data, options);
    }
    catch (Exception ....)
    {
        ...
    }
});

Этот шаблон можно повторить для создания нескольких уровней повторных попыток или различных условий. Он также может быть использован для создания различных приоритетных работников, предоставляя больший DOP высокоприоритетным работникам или большую задержку низкоприоритетным работникам

0 голосов
/ 21 октября 2019

Извините, нашел ответ: разделитель по умолчанию, используемый для blockingcollection и параллельного foreach, - это чанкинг и буферизация, что приводит к тому, что цикл foreach всегда ожидает достаточное количество элементов для следующего чанка. ночь, без обработки последних нескольких предметов!

Итак, вместо:

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
    Interlocked.Increment(ref completed);
});

вам придется использовать:

var partitioner = Partitioner.Create(queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(partitioner, options, target =>
{
    Interlocked.Increment(ref completed);
});
...