Использование очереди для передачи данных между двумя BackgroundWorkers - PullRequest
1 голос
/ 17 мая 2019

Я читаю и декодирую двоичный файл, побайтово. Для этого я использую два BackgroundWorker s: один для чтения файла, который производит List<byte> переменного размера для каждой «строки» моего файла, и один для обработки «строк».

Поскольку я хочу, чтобы они запускались в parralel, и я не знаю, какой из них будет быстрее другого, я использую Queue для передачи данных между двумя BackgroundWorker с.

Вот в чем дело: ни один из List<byte> не должен содержать значение 0 в любой момент. Я проверяю, что перед добавлением их в очередь. Тем не менее, на другом конце Queue некоторые списки содержат значения 0. Тем не менее, я создаю новый List<byte> при каждом вызове Dequeue(), поскольку, по-видимому, если я этого не сделаю, данные будут изменены до завершения обработки.

Я пытался вручную создать новый объект List<byte> и , а затем , присвоив ему результат Dequeue(), без улучшений. я впервые работаю с Queue, и, поскольку мой код многопоточный, шаг за шагом почти невозможно отладить.

Queue<List<byte>> q = new Queue<List<byte>>(); // My FIFO queue

// Reading thread
private void BackgroudWorkerRead_DoWork(object sender, DoWorkEventArgs e)
{
      // ... read the file
      List<byte> line_list = new List<byte>();
      // ... filling line_list with data
      // in this part I check that no byte added to line_list has the value 0, or else I display an errror message and end the process
      q.Enqueue(line_list);
      if (!backgroundWorkerNewLine.IsBusy) backgroundWorkerNewLine.RunWorkerAsync(); // if the other BackgroundWorker isn't processing data, now it needs to since we just added some to the queue
}

// Processing thread
private void backgroundWorkerNewLine_DoWork(object sender, DoWorkEventArgs e)
{
    while (q.Count > 0) // While there is data to process
    {
          string line_str = DecodeBytes(new List<byte>(q.Dequeue())); // Decoding
          string[] elements = line_str.Split(separator, StringSplitOptions.None); // Separating values

          Form1.ActiveForm.Invoke(new MethodInvoker(() => AddRow(elements))); // Add the line to a DataTable from the main thread
    }
}

public string DecodeBytes(List<byte> line)
{
 /// ... read each byte and return a string of the whole decoded line
}

public void AddRow(string[] el)
{
    MyDataTable.Rows.Add(el);
}

Кажется, что список, возвращаемый q.Dequeue (), не возвращает те же данные, которые были добавлены q.Enqueue ()

Ответы [ 2 ]

0 голосов
/ 23 мая 2019

При создании многопоточных приложений вы должны быть очень осторожны, чтобы не допустить одновременного доступа разных ресурсов к разным потокам.Если вы не предотвратите это, произойдут плохие вещи.Вы теряете обновления, ваши структуры данных становятся поврежденными, и все это происходит непредсказуемо и непоследовательно.Чтобы избежать этих проблем, вы должны синхронизировать весь доступ к общим ресурсам из разных потоков.Что может быть достигнуто с помощью оператора lock.Поэтому совет: всегда блокируйте, когда вы читаете и , обновляя общие ресурсы.Общий ресурс в вашем случае - Queue.Блокировка должна выполняться следующим образом:

// Reading thread
lock (q)
{
    q.Enqueue(line_list);
}

// Processing thread
while (true)
{
    List<byte> list;
    lock (q)
    {
        if (q.Count == 0) break;
        list = new List<byte>(q.Dequeue());
    }
    string line_str = DecodeBytes(list); // Decoding
    // ...

Недостатком блокировки является то, что она создает конфликт, поэтому вам не следует блокировать больше, чем это абсолютно необходимо.Особенно избегайте сложных вычислений, удерживая блокировку.

Кроме того, шаблон, который вы пытаетесь реализовать, - это шаблон производитель-потребитель, и .NET предлагает специализированный класс для упрощения этого шаблона.Это класс BlockingCollection, и он обрабатывает всю эту грязную синхронизацию потоков для вас.Это может помочь вам сократить код, который вы должны написать, за счет небольшой кривой обучения.В основном вам нужно изучить методы Add, CompleteAdding и GetConsumingEnumerable, и вы готовы к работе.

0 голосов
/ 23 мая 2019

Вы должны использовать Microsoft Reactive Framework (он же Rx) - NuGet System.Reactive.Windows.Forms (при условии, что вы пишете приложение WinForms) и добавить using System.Reactive.Linq;.

Rx позволяет вам использовать знакомый синтаксис LINQ для работы с параллельными операциями.

Вы не показали нам, как разбиваете файл на список List<byte>, поэтому я предполагаю, что у вас есть метод, который выглядит как IObservable<List<byte>> DeconstructFile(FileInfo fileInfo).

Теперь вы можете сделать это:

IObservable<string[]> query =
    from bytes in DeconstructFile(new FileInfo("myFile.bin"))
    from line_str in Observable.Start(() => DecodeBytes(bytes))
    select line_str.Split(separator, StringSplitOptions.None);

IDisposable subscription =
    query
        .ObserveOn(Form1.ActiveForm)
        .Subscribe(el => MyDataTable.Rows.Add(el));

Вот и все. Он работает параллельно, Observable.Start запускает новые потоки по мере необходимости и автоматически передает результаты на каждый шаг. .ObserveOn(Form1.ActiveForm) автоматически маршаллизирует .Subscribe к потоку пользовательского интерфейса.

Если вам нужно остановить код до его завершения, просто позвоните subscription.Dispose(). Простой.

...