Поток безопасный StreamWriter C # как это сделать?2 - PullRequest
7 голосов
/ 28 августа 2010

Так что это продолжение моего последнего вопроса - поэтому вопрос был «Каков наилучший способ создания программы, которая является поточно-ориентированной, с точки зрения того, что ей нужно записывать двойные значения в файл. Если функция, которая сохраняетзначения через streamwriter вызывается несколькими потоками? Как лучше всего это сделать? "

И я изменил некоторый код, найденный в MSDN, как насчет следующего?Этот правильно записывает все в файл.

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}

Ответы [ 4 ]

13 голосов
/ 28 августа 2010

Thread и QueueUserWorkItem - самые низкие доступные API для потоков .Я бы не использовал их, если бы у меня, наконец, не было другого выбора.Попробуйте класс Task для намного более высокого уровня абстракции.Подробнее см. Мой недавний пост в блоге на эту тему .

. Вы также можете использовать BlockingCollection<double> в качестве правильной очереди производителя / потребителя вместо попытки ее создания.вручную с самыми низкими доступными API для синхронизации .

. Изобретать эти колеса правильно, на удивление сложно.Я настоятельно рекомендую использовать классы, разработанные для этого типа потребностей (Task и BlockingCollection, если быть точным).Они встроены в платформу .NET 4.0, а доступны в качестве дополнения для .NET 3.5 .

.
6 голосов
/ 28 августа 2010
  • код имеет записывающее устройство в качестве экземпляра var, но использует статическую блокировку.Если у вас было несколько экземпляров, записывающих в разные файлы, нет никаких причин, по которым им нужно было бы использовать одну и ту же блокировку
  • для связанной заметки, поскольку у вас уже есть средство записи (как частный экземпляр var), вы можете использовать этодля блокировки вместо использования отдельного объекта locker в этом случае - это делает вещи немного проще.

«Правильный ответ» действительно зависит от того, что вы ищете с точки зрения поведения блокировки / блокировки,Например, самой простой вещью было бы пропустить промежуточную структуру данных, просто имея метод WriteValues, чтобы каждый поток, «сообщающий» о своих результатах, продолжал и записывал их в файл.Что-то вроде:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}

Конечно, это означает, что рабочие потоки сериализуются во время их фаз «отчета о результатах» - в зависимости от характеристик производительности, что может быть вполне приемлемым (5 минут для генерации, 500 мс для записи,например).

На другом конце спектра рабочие потоки будут записывать в структуру данных.Если вы работаете в .NET 4, я бы порекомендовал просто использовать ConcurrentQueue , а не делать это самостоятельно.

Кроме того, вы можете захотеть сделать файл ввода / вывода более крупнымпакетов, чем те, о которых сообщают рабочие потоки, так что вы можете просто делать запись в фоновом потоке с определенной частотой.Этот конец спектра выглядит примерно так (вы бы удалили вызовы Console.WriteLine в реальном коде, они просто есть, чтобы вы могли видеть, как он работает в действии)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
4 голосов
/ 29 августа 2010

То есть вы хотите, чтобы группа потоков записывала данные в один файл с помощью StreamWriter? Легко. Просто заблокируйте объект StreamWriter.

Код здесь создаст 5 потоков. Каждый поток будет выполнять 5 «действий», а в конце каждого действия он будет записывать 5 строк в файл с именем «file».

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ConsoleApplication1 {
    class Program {
        static void Main() {
            StreamWriter Writer = new StreamWriter("file");

            Action<int> ThreadProcedure = (i) => {
                // A thread may perform many actions and write out the result after each action
                // The outer loop here represents the multiple actions this thread will take
                for (int x = 0; x < 5; x++) {
                    // Here is where the thread would generate the data for this action
                    // Well simulate work time using a call to Sleep
                    Thread.Sleep(1000);
                    // After generating the data the thread needs to lock the Writer before using it.
                    lock (Writer) {
                        // Here we'll write a few lines to the Writer
                        for (int y = 0; y < 5; y++) {
                            Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                        }
                    }
                }
            };

            //Now that we have a delegate for the thread code lets make a few instances

            List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
            for (int w = 0; w < 5; w++) {
                AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
            }

            // Wait for all threads to complete
            foreach (IAsyncResult r in AsyncResultList) {
                r.AsyncWaitHandle.WaitOne();
            }

            // Flush/Close the writer so all data goes to disk
            Writer.Flush();
            Writer.Close();
        }
    }
}

Результатом должен быть файл «файл» с 125 строками, в котором все «действия» выполняются одновременно, а результат каждого действия записывается синхронно в файл.

2 голосов
/ 28 августа 2010

Код, который у вас есть, слегка нарушен - в частности, если рабочий элемент в очереди запускается первым, он сразу же очищает (пустой) список значений перед завершением, после чего ваш работник выходит и заполняет список.(что в конечном итоге будет проигнорировано).Событие автосброса также ничего не делает, поскольку ничего не запрашивает и не ожидает его состояния.

Кроме того, поскольку каждый поток использует различную блокировку , блокировки не имеют смысла!Вы должны быть уверены, что удерживаете одну общую блокировку при доступе к Streamwriter.Вам не нужна блокировка между кодом очистки и кодом генерации;вам просто нужно убедиться, что очистка запускается после завершения генерации.

Вы, вероятно, на правильном пути, хотя - хотя я бы использовал массив фиксированного размера вместо списка и сбрасывал все записииз массива, когда он становится полным.Это исключает возможность нехватки памяти, если поток долгоживущий.

...