Я пытаюсь написать простое приложение производителя-потребителя, в котором мне нужно читать данные кусками из файла (который может быть огромным) и (для простых целей тестирования) просто записывать их в другой файл через другой поток.
Я пытался следить за множеством онлайн-источников, но мне сложно понять эти задачи синхронизации потоков, и каждый из найденных примеров упустил для меня некоторые важные аспекты.
Я собрал куски кода, которые ПОЧТИ кажутся работающими , но есть что-то связанное с потоками, которое явно неверно, поэтому я хотел попросить вас о помощи, если кто-нибудь может заметить, что я делаю неправильно. Если я запускаю приведенную ниже программу для некоторого тестового файла, программа завершает свою работу ОК (по крайней мере для меня и моего тестового файла), но , если я раскомментирую Thread.Sleep(20)
в методе dequeueObjectAndWriteItToFile
( чтобы проверить, что происходит, когда производитель быстрее, чем потребитель), затем (на основе данных, напечатанных в консоли), производитель вставляет блоки данных maxQueueSize + 1 в очередь, и программа попадает в бесконечное число l * 1040. * или что-то .
Я подозреваю, что _producerThreadWaitEventHandler.Set()
вызов может быть частью проблемы, потому что на данный момент он вызывается в dequeueObjectAndWriteItToFile
для каждого пока l oop (я хотел бы вызвать его только в случае необходимости, т. е. если был вызван _producerThreadWaitEventHandler.waitOne()
, и я должен разбудить этот поток, но я не знаю как узнать, вызван ли для определенного потока waitOne или нет, чтобы разбудить поток ). Конечно, могут быть и другие проблемы с синхронизацией, но, поскольку я новичок в многопоточности, я не знаю, где сначала искать и какое было бы лучшее решение.
Обратите внимание, я хочу использовать (и понимаю ) основные методы c (такие как Monitor или AutoResetEvent ) для синхронизации (вместо BlockingQueue, TPL et c.), поэтому я надеюсь, что некоторые незначительные изменения в приведенном ниже коде
Буду благодарен за любую подсказку.
Спасибо.
using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
class ProducerConsumerApp : IDisposable
{
public static string originalFilePath = @"D:\test.txt";
public static string outputFilePath = @"D:\test_export.txt";
public static int blockSize = 15;
int maxQueueSize = 4; // max allowed number of objects in the queue
EventWaitHandle _consumerThreadWaitEventHandler = new AutoResetEvent(false);
EventWaitHandle _producerThreadWaitEventHandler = new AutoResetEvent(false);
Thread _consumerThread;
readonly object _lock = new object();
Queue<byte[]> _queue = new Queue<byte[]>();
public ProducerConsumerApp(Stream outputStream)
{
_consumerThread = new Thread(dequeueObjectAndWriteItToFile);
_consumerThread.Start(outputStream);
}
public void enqueueObject(byte[] data)
{
lock (_lock)
{
// TODO !!!
// Make sure producent doesn't enqueue more objects than the maxQueueSize is,
// i.e. let the producent wait until consumer dequeues some object from the full queue
if (_queue.Count > maxQueueSize) // would "while" be better? Doesn't seem to change anything
{
_producerThreadWaitEventHandler.WaitOne();
}
// Thread.Sleep(20); // just for testing
_queue.Enqueue(data);
// data being read in case of a text file:
//string str = (data==null) ? "<null>" : System.Text.Encoding.Default.GetString(data);
//Console.WriteLine("Enqueuing data: "+str);
}
_consumerThreadWaitEventHandler.Set(); // data enqueued => wake the consumerThread
}
public void Dispose() // called automatically (IDisposable implementer) when instance is being destroyed
{
enqueueObject(null); // Signal the consumer to exit.
_consumerThread.Join(); // Wait for the consumer's thread to finish.
_consumerThreadWaitEventHandler.Close(); // Release any OS resources.
}
void dequeueObjectAndWriteItToFile(object outputStream)
{
while (true)
{
// Thread.Sleep(20); // slow down the consumerThread to check what happens when the producer fully fills the queue
// PROBLEM - the app gets into some infinite loop if I do this!!! What exactly is wrong?
byte[] data = null;
lock (_lock)
if (_queue.Count > 0) // queue not empty
{
data = _queue.Dequeue();
_producerThreadWaitEventHandler.Set();
// !!! This doesn't seem right - I don't want to call this in each while iteration
// I would like to call it only if _producerThreadWaitEventHandler.WaitOne has been called
// but how to check such a condition?
if (data == null)
{
// Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
return;
}
}
if (data != null)
{
((FileStream)outputStream).Write(data, 0, data.Length); // write data from the queue to a file
// just a test in case of a text file:
// string str = System.Text.Encoding.Default.GetString(data);
// Console.WriteLine("Data block retrieved from the queue and written to a file: " + str);
} else { // empty queue => let the consumerThread wait
_consumerThreadWaitEventHandler.WaitOne(); // No more tasks - wait for a signal
}
}
}
static void Main()
{
FileInfo originalFile = new FileInfo(originalFilePath);
byte[] data = new byte[blockSize];
int bytesRead;
using (FileStream originalFileStream = originalFile.OpenRead()) // file input stream
using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
{
while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0) // reads blocks of data from a file
{
// test - in case of a text file:
//string str = System.Text.Encoding.Default.GetString(data);
//Console.WriteLine("data block read from a file:" + str);
if (bytesRead < data.Length)
{
byte[] data2 = new byte[bytesRead];
Array.Copy(data, data2, bytesRead);
data = data2;
}
q.enqueueObject(data); // put the data into the queue
data = new byte[blockSize];
}
}
// because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread
Console.WriteLine("Finish");
}
}