Streamwriter, StringBuilder и параллельные циклы - PullRequest
2 голосов
/ 17 февраля 2012

Извините за большой кусок кода, я не мог объяснить это с меньшими затратами. В основном я пытаюсь записать в файл из многих задач.Ребята, скажите, пожалуйста, что я делаю не так?_streamWriter.WriteLine() бросает ArgumentOutOfRangeException.

class Program
{
    private static LogBuilder _log = new LogBuilder();
    static void Main(string[] args)
    {
        var acts = new List<Func<string>>();
        var rnd = new Random();
        for (int i = 0; i < 10000; i++)
        {
            acts.Add(() =>
            {
                var delay = rnd.Next(300);
                Thread.Sleep(delay);
                return "act that that lasted "+delay;
            });
        }

        Parallel.ForEach(acts, act =>
        {
            _log.Log.AppendLine(act.Invoke());
            _log.Write();
        });
    }
}

public class LogBuilder : IDisposable
{
    public StringBuilder Log = new StringBuilder();
    private FileStream _fileStream;
    private StreamWriter _streamWriter;

    public LogBuilder()
    {
        _fileStream = new FileStream("log.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite);
        _streamWriter = new StreamWriter(_fileStream) { AutoFlush = true };
    }
    public void Write()
    {
        lock (Log)
        {
            if (Log.Length <= 0) return;
            _streamWriter.WriteLine(Log.ToString()); //throws here. Although Log.Length is greater than zero
            Log.Clear();
        }
    }

    public void Dispose()
    {
        _streamWriter.Close(); _streamWriter.Dispose(); _fileStream.Close(); fileStream.Dispose();
    }
}

Ответы [ 4 ]

4 голосов
/ 19 февраля 2012

Это не ошибка в StringBuilder, это ошибка в вашем коде. И модификация, которую вы показали в своем последующем ответе (где вы заменяете Log.String на цикл, извлекающий символы по одному за раз), не исправляет это. Он больше не будет генерировать исключение, но он также не будет работать должным образом.

Проблема в том, что вы используете StringBuilder в двух местах многопоточного кода, и одно из них не пытается его заблокировать, а это означает, что чтение может происходить в одном потоке одновременно с записью в другом. В частности, проблема заключается в этой строке:

_log.Log.AppendLine(act.Invoke()); 

Вы делаете это внутри Parallel.ForEach. Здесь вы не предпринимаете никаких попыток синхронизации, даже если это будет выполняться одновременно несколькими потоками. Итак, у вас есть две проблемы:

  1. Несколько вызовов на AppendLine могут выполняться одновременно в нескольких потоках
  2. Один поток может пытаться вызвать Log.ToString одновременно с тем, как один или несколько других потоков вызывают AppendLine

Вы получите только одно чтение за раз, потому что вы используете ключевое слово lock для их синхронизации. Проблема в том, что вы не получаете такую ​​же блокировку при вызове AppendLine.

Ваше «исправление» на самом деле не является исправлением. Вам удалось только усложнить задачу. Теперь это просто пойдет не так по-разному и более тонкими способами. Например, я предполагаю, что ваш метод Write продолжает вызывать Log.Clear после того, как ваш цикл for завершит свою последнюю итерацию. Между завершением этой последней итерации и выполнением вызова Log.Clear возможно, что какой-то другой поток получит другой вызов AppendLine, потому что синхронизация этих вызовов AppendLine отсутствует.

В результате вы будете иногда пропускать вещи. Код будет записывать в конструктор строк вещи, которые затем очищаются, даже не записываясь в потоковую запись.

Кроме того, существует довольно высокая вероятность того, что одновременные вызовы AppendLine вызовут проблемы. Если вам повезет, они будут время от времени падать. (Это хорошо, потому что дает понять, что у вас есть проблема, которую нужно исправить.) Если вам не повезет, вы будете время от времени получать повреждение данных - два потока могут закончить запись в одно и то же место в результате StringBuilder или в беспорядке, или полностью потерянные данные.

Опять же, это не ошибка в StringBuilder. Он не предназначен для поддержки одновременного использования из нескольких потоков. Ваша задача - убедиться, что только один поток за раз делает что-либо для конкретного экземпляра StringBuilder. Как сказано в документации для этого класса, «ни один из членов экземпляра не гарантированно является потокобезопасным».

Очевидно, что вы не хотите удерживать блокировку при вызове act.Invoke (), потому что это, вероятно, та самая работа, которую вы хотите распараллелить. Поэтому я думаю, что-то вроде этого может работать лучше:

string result = act();
lock(_log.Log)
{
    _log.Log.AppendLine(result);
}

Однако, если бы я оставил его там, я бы на самом деле не помогал вам, потому что это выглядит очень неправильно для меня.

Если вы когда-нибудь обнаружите, что блокируете поле в чужом объекте, это является признаком проблемы проектирования в вашем коде. Возможно, было бы более разумно изменить дизайн, чтобы метод LogBuilder.Write принимал строку. Честно говоря, я даже не уверен, почему вы вообще используете StringBuilder здесь, так как вы, кажется, используете его просто как область хранения для строки, которую вы немедленно записываете в потоковую запись. Что вы надеялись добавить сюда StringBuilder? Следующее будет проще и, похоже, ничего не потеряет (кроме исходных ошибок параллелизма):

public class LogBuilder : IDisposable
{
    private readonly object _lock = new object();
    private FileStream _fileStream;
    private StreamWriter _streamWriter;

    public LogBuilder()
    {
        _fileStream = new FileStream("log.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite);
        _streamWriter = new StreamWriter(_fileStream) { AutoFlush = true };
    }
    public void Write(string logLine)
    {
        lock (_lock)
        {
            _streamWriter.WriteLine(logLine);
        }
    }

    public void Dispose()
    {
        _streamWriter.Dispose(); fileStream.Dispose();
    }
}    
1 голос
/ 21 февраля 2012

Я думаю, причина в том, что вы обращаетесь к stringBuilder в скобке Парелла

_log.Log.AppendLine(act.Invoke());
_log.Write();

и внутри LogBuilder вы выполняете lock (), чтобы запретить выделение памяти для stringBuidler. Вы изменяете потоковую запись так, чтобы обрабатывать журнал в каждом символе, поэтому процесс parellel разблокирует выделение памяти для stringBuilder.

Разделение параллельного процесса на отдельные действия, скорее всего, уменьшит проблему

Parallel.ForEach(acts, act =>
{
    _log.Write(act.Invoke());
});

в классе LogBuilder

private readonly object _lock = new object();

public void Write(string logLines)
{
    lock (_lock)
    {
        //_wr.WriteLine(logLines);
        Console.WriteLine(logLines);
    }
}
0 голосов
/ 04 июля 2016

Код можно сделать проще, чем ответ @IanGriffiths, используя TextWriter.Synchronized для создания блокирующей оболочки вокруг StreamWriter.Инфраструктура предоставляет синхронизированные помощники во многих классах для создания потоково-безопасных экземпляров, когда это необходимо.

Я печатаю текущую переменную итерации цикла i, чтобы можно было легко увидеть влияние параллелизма в планировании.Нужно использовать локальную переменную для хранения текущего значения i при запуске задания.Этот метод позволит избежать захвата ссылки на переменную итерации в замыкании, каждый раз печатая переменную вместо желаемых значений 1, 2, 3 и т. Д.

Полный список полезных помощников синхронизации.http://referencesource.microsoft.com/#q=Synchronized.

System.Collections

  • статический ArrayList Synchronized (список ArrayList)
  • статический IList Synchronized (список IList)
  • статическая синхронизация Hashtable (таблица хеш-таблиц)
  • статическая синхронизация очереди (очередь)
  • статическая синхронизация SortedList (список SortedList)
  • статическая синхронизация стека (стек стека)

System.Collections.Generic

  • статический синхронизированный IList (список списка)

System.IO

  • статический Поток Синхронизированный (Поток потока)
  • статический Синхронизированный TextReader (читатель TextReader)
  • статический Синхронизированный TextWriter (писатель TextWriter)

System.Text.RegularExpressions

  • статическое совпадение Синхронизировано (внутреннее совпадение)
  • статическое Синхронизация группы (внутренняя группа)

Более простой код с правильно инкапсулированным классом.

class Program
{
    private static LogBuilder _log = new LogBuilder();
    static void Main(string[] args)
    {
        var acts = new List<Func<string>>();
        var rnd = new Random();
        for (int i = 0; i < 10000; i++)
        {
            int local_i = i;
            acts.Add(() =>
            {
                var delay = rnd.Next(5);
                Thread.Sleep(delay);
                return local_i.ToString() + " act that that lasted " + delay.ToString();
            });
        }
        try
        {
            Parallel.ForEach(acts, act =>
            {
                _log.WriteLine(act.Invoke());
            });
        }
        finally
        {
            _log.Dispose();
        }
    }
}

public class LogBuilder : IDisposable
{
    private FileStream _fileStream;
    private StreamWriter _streamWriter;
    private TextWriter _synchronizedWriter;

    public LogBuilder()
    {
        _fileStream = new FileStream(@"C:\temp\log.txt", FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite);
        _streamWriter = new StreamWriter(_fileStream) { AutoFlush = true };
        _synchronizedWriter = TextWriter.Synchronized(_streamWriter);
    }
    public void WriteLine(string message)
    {
        _synchronizedWriter.WriteLine(message);
    }

    public void Dispose()
    {
        _streamWriter.Close(); _streamWriter.Dispose(); _fileStream.Close(); _fileStream.Dispose();
    }
}
0 голосов
/ 17 февраля 2012

Кажется, что это не проблема параллелизма. Это проблема StringBuilder.

Я заменил:

_streamWriter.WriteLine(Log.ToString());

с:

for (int i = 0; i < Log.Length; i++)
{
    _streamWriter.Write(Log[i]);
}

И это сработало. Для дальнейшего использования: http://msdn.microsoft.com/en-us/library/system.text.stringbuilder(v=VS.100).aspx

Раздел выделения памяти.

...