Проблема синхронизации производитель-потребитель - PullRequest
2 голосов
/ 27 февраля 2020

Я пытаюсь написать простое приложение производителя-потребителя, в котором мне нужно читать данные кусками из файла (который может быть огромным) и (для простых целей тестирования) просто записывать их в другой файл через другой поток.

Я пытался следить за множеством онлайн-источников, но мне сложно понять эти задачи синхронизации потоков, и каждый из найденных примеров упустил для меня некоторые важные аспекты.

Я собрал куски кода, которые ПОЧТИ кажутся работающими , но есть что-то связанное с потоками, которое явно неверно, поэтому я хотел попросить вас о помощи, если кто-нибудь может заметить, что я делаю неправильно. Если я запускаю приведенную ниже программу для некоторого тестового файла, программа завершает свою работу ОК (по крайней мере для меня и моего тестового файла), но , если я раскомментирую Thread.Sleep(20) в методе dequeueObjectAndWriteItToFile ( чтобы проверить, что происходит, когда производитель быстрее, чем потребитель), затем (на основе данных, напечатанных в консоли), производитель вставляет блоки данных maxQueueSize + 1 в очередь, и программа попадает в бесконечное число l * 1040. * или что-то .

Я подозреваю, что _producerThreadWaitEventHandler.Set() вызов может быть частью проблемы, потому что на данный момент он вызывается в dequeueObjectAndWriteItToFile для каждого пока l oop (я хотел бы вызвать его только в случае необходимости, т. е. если был вызван _producerThreadWaitEventHandler.waitOne(), и я должен разбудить этот поток, но я не знаю как узнать, вызван ли для определенного потока waitOne или нет, чтобы разбудить поток ). Конечно, могут быть и другие проблемы с синхронизацией, но, поскольку я новичок в многопоточности, я не знаю, где сначала искать и какое было бы лучшее решение.

Обратите внимание, я хочу использовать (и понимаю ) основные методы c (такие как Monitor или AutoResetEvent ) для синхронизации (вместо BlockingQueue, TPL et c.), поэтому я надеюсь, что некоторые незначительные изменения в приведенном ниже коде

Буду благодарен за любую подсказку.

Спасибо.

using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;

class ProducerConsumerApp : IDisposable
{
public static string originalFilePath = @"D:\test.txt";
public static string outputFilePath = @"D:\test_export.txt";
public static int blockSize = 15;

int maxQueueSize = 4;  // max allowed number of objects in the queue

EventWaitHandle _consumerThreadWaitEventHandler = new AutoResetEvent(false);
EventWaitHandle _producerThreadWaitEventHandler = new AutoResetEvent(false);

Thread _consumerThread;
readonly object _lock = new object();
Queue<byte[]> _queue = new Queue<byte[]>();

public ProducerConsumerApp(Stream outputStream)
{
    _consumerThread = new Thread(dequeueObjectAndWriteItToFile);
    _consumerThread.Start(outputStream);
}

public void enqueueObject(byte[] data)
{
    lock (_lock)
    {
        // TODO !!!
        // Make sure producent doesn't enqueue more objects than the maxQueueSize is,
        // i.e. let the producent wait until consumer dequeues some object from the full queue
        if (_queue.Count > maxQueueSize)     // would "while" be better? Doesn't seem to change anything
        {
            _producerThreadWaitEventHandler.WaitOne();
        }
        // Thread.Sleep(20); // just for testing

        _queue.Enqueue(data);

        // data being read in case of a text file:
        //string str = (data==null) ? "<null>" : System.Text.Encoding.Default.GetString(data);
        //Console.WriteLine("Enqueuing data: "+str);

    }
    _consumerThreadWaitEventHandler.Set();  // data enqueued  => wake the consumerThread
}

public void Dispose()  // called automatically (IDisposable implementer) when instance is being destroyed
{
    enqueueObject(null);                         // Signal the consumer to exit.
    _consumerThread.Join();                     // Wait for the consumer's thread to finish.
    _consumerThreadWaitEventHandler.Close();    // Release any OS resources.
}

void dequeueObjectAndWriteItToFile(object outputStream)
{
    while (true)
    {
        // Thread.Sleep(20); // slow down the consumerThread to check what happens when the producer fully fills the queue
        // PROBLEM - the app gets into some infinite loop if I do this!!! What exactly is wrong?

        byte[] data = null;
        lock (_lock)
            if (_queue.Count > 0)   // queue not empty
            {
                data = _queue.Dequeue();

                _producerThreadWaitEventHandler.Set();
                // !!! This doesn't seem right - I don't want to call this in each while iteration
                // I would like to call it only if _producerThreadWaitEventHandler.WaitOne has been called 
                // but how to check such a condition?

                if (data == null) 
                {                  
                    // Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
                    return;                
                }
            }
        if (data != null)
        {
            ((FileStream)outputStream).Write(data, 0, data.Length); // write data from the queue to a file

            // just a test in case of a text file:
            // string str = System.Text.Encoding.Default.GetString(data);
            // Console.WriteLine("Data block retrieved from the queue and written to a file: " + str);

        } else {   // empty queue => let the consumerThread wait
            _consumerThreadWaitEventHandler.WaitOne();  // No more tasks - wait for a signal
        }
    }
}

static void Main()
{

    FileInfo originalFile = new FileInfo(originalFilePath);
    byte[] data = new byte[blockSize];
    int bytesRead;

    using (FileStream originalFileStream = originalFile.OpenRead())    // file input stream
    using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
    using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
    {
        while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0)   // reads blocks of data from a file
        {
            // test - in case of a text file:
            //string str = System.Text.Encoding.Default.GetString(data);
            //Console.WriteLine("data block read from a file:" + str);

            if (bytesRead < data.Length)
            {
                byte[] data2 = new byte[bytesRead];
                Array.Copy(data, data2, bytesRead);
                data = data2;
            }

            q.enqueueObject(data);   // put the data into the queue

            data = new byte[blockSize];
        }
    }
    // because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread

    Console.WriteLine("Finish");
}
}

Ответы [ 2 ]

0 голосов
/ 27 февраля 2020

Это становится намного проще, если вместо этого вы используете BlockingCollection . EG

using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
using System.Collections.Concurrent;

class ProducerConsumerApp : IDisposable
{
    public static int blockSize = 15;

    const int maxQueueSize = 4;  // max allowed number of objects in the queue

    BlockingCollection<byte[]> _queue = new BlockingCollection<byte[]>(maxQueueSize);
    private Thread _consumerThread;

    public ProducerConsumerApp(Stream outputStream)
    {
        _consumerThread = new Thread(dequeueObjectAndWriteItToFile);
        _consumerThread.Start(outputStream);
    }

    public void enqueueObject(byte[] data)
    {
        _queue.Add(data);
    }

    public void Dispose()  // called automatically (IDisposable implementer) when instance is being destroyed
    {
        enqueueObject(null);                         // Signal the consumer to exit.
        _consumerThread.Join();                     // Wait for the consumer's thread to finish.
    }

    void dequeueObjectAndWriteItToFile(object outputStream)
    {
        var outStream = (FileStream)outputStream;
        while (true)
        {
            var data = _queue.Take();
            if (data == null)
            {
                outStream.Close();
                // Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
                return;
            }
            outStream.Write(data, 0, data.Length); // write data from the queue to a file
        }
    }

    static void Main()
    {
        var originalFilePath = @"c:\temp\test.txt";
        var outputFilePath = @"c:\temp\test_out.txt";

        FileInfo originalFile = new FileInfo(originalFilePath);
        byte[] data = new byte[blockSize];
        int bytesRead;

        using (FileStream originalFileStream = originalFile.OpenRead())    // file input stream
        using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
        using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
        {
            while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0)   // reads blocks of data from a file
            {
                // test - in case of a text file:
                //string str = System.Text.Encoding.Default.GetString(data);
                //Console.WriteLine("data block read from a file:" + str);

                if (bytesRead < data.Length)
                {
                    byte[] data2 = new byte[bytesRead];
                    Array.Copy(data, data2, bytesRead);
                    data = data2;
                }

                q.enqueueObject(data);   // put the data into the queue

                data = new byte[blockSize];
            }
        }
        // because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread

        Console.WriteLine("Finish");
    }
}
0 голосов
/ 27 февраля 2020

Вы проблема в том, что вы ждете в замке. Это означает, что другой поток также будет блокировать оператор блокировки и никогда не вызовет _producerThreadWaitEventHandler.Set(); Классический тупик.

Вам лучше использовать Семафор , чтобы ограничить количество элементов, которые может поместить продукт. очередь.
Инициализируйте семафор как свободный: producerSemaphore = new Semaphore (15, 15);. В производителе дождитесь семафора, а в потребителе - позвоните Release().

Таким же образом вы можете использовать семафор или CountdownEvent , чтобы не полагаться на queue.Count.

Еще лучше, вы можете использовать ConcurrentQueue в сочетании с семафором, чтобы убедиться, что производитель не переполняет очередь. Если вы успешно удалили элемент из очереди, позвоните producerSemaphore.Release();.

...