Многопоточное взаимодействие приложений с потоком логгера - PullRequest
6 голосов
/ 23 октября 2009

Здесь я снова с вопросами о многопоточности и упражнении по моему параллельному программированию класс.

У меня есть многопоточный сервер - реализованный с использованием .NET Модель асинхронного программирования - с GET ( загрузка ) и PUT ( загрузка ) файловые сервисы. Эта часть сделана и проверена.

Бывает, что в заявлении о проблеме говорится, что этот сервер должен иметь активность logging с минимальным влиянием на время отклика сервера, и это должно поддерживаться низким приоритетом thread - поток логгера - создан для этого эффекта. Все протоколирование сообщения должны быть переданы потоками , которые производят их этому протоколирующему потоку , с использованием механизма связи, который не может заблокировать thread , который вызывает его (помимо необходимой блокировки для обеспечения взаимного исключения) и предполагая, что некоторые сообщения регистрации могут быть проигнорированы.

Вот мое текущее решение, пожалуйста, помогите проверить, является ли это решением указанной проблемы:

using System;
using System.IO;
using System.Threading;

// Multi-threaded Logger
public class Logger {
    // textwriter to use as logging output
    protected readonly TextWriter _output;
    // logger thread
    protected Thread _loggerThread;
    // logger thread wait timeout
    protected int _timeOut = 500; //500ms
    // amount of log requests attended
    protected volatile int reqNr = 0;
    // logging queue
    protected readonly object[] _queue;
    protected struct LogObj {
        public DateTime _start;
        public string _msg;
        public LogObj(string msg) {
            _start = DateTime.Now;
            _msg = msg;
        }
        public LogObj(DateTime start, string msg) {
            _start = start;
            _msg = msg;
        }
        public override string ToString() {
            return String.Format("{0}: {1}", _start, _msg);
        }
    }

    public Logger(int dimension,TextWriter output) {
        /// initialize queue with parameterized dimension
        this._queue = new object[dimension];
        // initialize logging output
        this._output = output;
        // initialize logger thread
        Start();
    }
    public Logger() {
        // initialize queue with 10 positions
        this._queue = new object[10];
        // initialize logging output to use console output
        this._output = Console.Out;
        // initialize logger thread
        Start();
    }

    public void Log(string msg) {
        lock (this) {
            for (int i = 0; i < _queue.Length; i++) {
                // seek for the first available position on queue
                if (_queue[i] == null) {
                    // insert pending log into queue position
                    _queue[i] = new LogObj(DateTime.Now, msg);
                    // notify logger thread for a pending log on the queue
                    Monitor.Pulse(this);
                    break;
                }
                // if there aren't any available positions on logging queue, this
                // log is not considered and the thread returns
            }
        }
    }

    public void GetLog() {
        lock (this) {
            while(true) {
                for (int i = 0; i < _queue.Length; i++) {
                    // seek all occupied positions on queue (those who have logs)
                    if (_queue[i] != null) {
                        // log
                        LogObj obj = (LogObj)_queue[i];
                        // makes this position available
                        _queue[i] = null;
                        // print log into output stream
                        _output.WriteLine(String.Format("[Thread #{0} | {1}ms] {2}",
                                                        Thread.CurrentThread.ManagedThreadId,
                                                        DateTime.Now.Subtract(obj._start).TotalMilliseconds,
                                                        obj.ToString()));
                    }
                }
                // after printing all pending log's (or if there aren't any pending log's),
                // the thread waits until another log arrives
                //Monitor.Wait(this, _timeOut);
                Monitor.Wait(this);
            }
        }
    }

    // Starts logger thread activity
    public void Start() {
        // Create the thread object, passing in the Logger.Start method
        // via a ThreadStart delegate. This does not start the thread.
        _loggerThread = new Thread(this.GetLog);
        _loggerThread.Priority = ThreadPriority.Lowest;
        _loggerThread.Start();
    }

    // Stops logger thread activity
    public void Stop() {
        _loggerThread.Abort();
        _loggerThread = null;
    }

    // Increments number of attended log requests
    public void IncReq() { reqNr++; }

}

В основном, вот основные пункты этого кода:

  1. Запуск низкоприоритетного потока , который зацикливает очередь журналов и печатает ожидающие журналы на выходе. После этого поток приостанавливается до поступления нового log ;
  2. Когда приходит журнал, поток регистратора пробуждается и выполняет свою работу.

Является ли это решение поточно-безопасным ? Я читал Производители-потребители Проблема и алгоритм решения, но в этой задаче, хотя у меня несколько производителей, у меня только один читатель.

Заранее спасибо за все ваше внимание.

Ответы [ 4 ]

4 голосов
/ 23 октября 2009

Кажется, это должно работать. Производители-потребители не должны сильно меняться в случае одного потребителя. Маленькие клыки:

  • получение блокировки может быть дорогой операцией (как говорит @Vitaliy Lipchinsky). Я бы порекомендовал сравнить ваш регистратор с простыми «сквозными» регистраторами и регистратором, используя блокированные операции. Другой альтернативой будет замена существующей очереди на пустую в GetLog и немедленный выход из критической секции. Таким образом, ни один из производителей не будет заблокирован длительными операциями с потребителями.

  • сделать ссылочный тип LogObj (класс). Нет смысла делать его структурированным, так как вы все равно его занимаетесь боксом. или сделать поле _queue типа LogObj[] (в любом случае это лучше).

  • создайте фоновый поток, чтобы он не мешал закрывать вашу программу, если не будет вызван Stop.

  • Промойте TextWriter. Или же вы рискуете потерять даже те записи, которым удалось уместить очередь (10 элементов - это немного мало, ИМХО)

  • Реализация IDisposable и / или финализатор. Ваш регистратор владеет потоком и средством записи текста, и они должны быть освобождены (и очищены - см. Выше).

3 голосов
/ 23 октября 2009

Привет. Быстро посмотрел, и, хотя он кажется поточно-ориентированным, я не думаю, что он особенно оптимален. Я бы предложил решение в этом направлении

ПРИМЕЧАНИЕ: просто прочитайте другие ответы. Далее следует довольно оптимальное, оптимистичное решение для блокировки на основе вашего собственного. Основное отличие заключается в блокировке внутреннего класса, минимизации «критических секций» и обеспечении плавного завершения потока. Если вы хотите вообще избежать блокировки, то можете попробовать некоторые из этих нестабильных «неблокирующих» связанных списков, как предлагает @Vitaliy Lipchinsky.

using System.Collections.Generic;
using System.Linq;
using System.Threading;

...

public class Logger
{
    // BEST PRACTICE: private synchronization object. 
    // lock on _syncRoot - you should have one for each critical
    // section - to avoid locking on public 'this' instance
    private readonly object _syncRoot = new object ();

    // synchronization device for stopping our log thread.
    // initialized to unsignaled state - when set to signaled
    // we stop!
    private readonly AutoResetEvent _isStopping = 
        new AutoResetEvent (false);

    // use a Queue<>, cleaner and less error prone than
    // manipulating an array. btw, check your indexing
    // on your array queue, while starvation will not
    // occur in your full pass, ordering is not preserved
    private readonly Queue<LogObj> _queue = new Queue<LogObj>();

    ...

    public void Log (string message)
    {
        // you want to lock ONLY when absolutely necessary
        // which in this case is accessing the ONE resource
        // of _queue.
        lock (_syncRoot)
        {
            _queue.Enqueue (new LogObj (DateTime.Now, message));
        }
    }

    public void GetLog ()
    {
        // while not stopping
        // 
        // NOTE: _loggerThread is polling. to increase poll
        // interval, increase wait period. for a more event
        // driven approach, consider using another
        // AutoResetEvent at end of loop, and signal it
        // from Log() method above
        for (; !_isStopping.WaitOne(1); )
        {
            List<LogObj> logs = null;
            // again lock ONLY when you need to. because our log
            // operations may be time-intensive, we do not want
            // to block pessimistically. what we really want is 
            // to dequeue all available messages and release the
            // shared resource.
            lock (_syncRoot)
            {
                // copy messages for local scope processing!
                // 
                // NOTE: .Net3.5 extension method. if not available
                // logs = new List<LogObj> (_queue);
                logs = _queue.ToList ();
                // clear the queue for new messages
                _queue.Clear ();
                // release!
            }
            foreach (LogObj log in logs)
            {
                // do your thang
                ...
            }
        }
    }
}
...
public void Stop ()
{
    // graceful thread termination. give threads a chance!
    _isStopping.Set ();
    _loggerThread.Join (100);
    if (_loggerThread.IsAlive)
    {
        _loggerThread.Abort ();
    }
    _loggerThread = null;
}
1 голос
/ 23 октября 2009

На самом деле, вы вводите блокировку здесь. У вас есть блокировка при отправке записи журнала в очередь (метод Log): если 10 потоков одновременно поместили 10 элементов в очередь и разбудили поток Logger, то 11-й поток будет ожидать, пока поток регистратора зарегистрирует все элементы ...

Если вы хотите что-то действительно масштабируемое - используйте очередь без блокировки (пример ниже). С механизмом синхронизации очереди без блокировки будет очень просто (вы даже можете использовать один дескриптор ожидания для уведомлений).

Если вам не удастся найти реализацию очереди без блокировки в сети, вот идея, как это сделать: Используйте связанный список для реализации. Каждый узел в связанном списке содержит значение и переменную ссылку на следующий узел. поэтому для операций постановки и снятия вы можете использовать метод Interlocked.CompareExchange. Надеюсь, идея понятна. Если нет - дайте мне знать, и я предоставлю более подробную информацию.

0 голосов
/ 23 октября 2009

Я просто провожу здесь мысленный эксперимент, поскольку у меня нет времени, чтобы на самом деле попробовать код прямо сейчас, но я думаю, что вы можете сделать это вообще без блокировок, если вы креативны.

Ваш класс ведения журнала должен содержать метод, который выделяет очередь и семафор при каждом вызове (и другой, который освобождает очередь и семафор, когда поток завершается). Потоки, которые хотят вести регистрацию, будут вызывать этот метод при запуске. Когда они хотят войти, они помещают сообщение в свою очередь и устанавливают семафор. Поток регистратора имеет большой цикл, который проходит по очередям и проверяет связанные семафоры. Если семафор, связанный с очередью, больше нуля, то очередь отбрасывается, а семафор уменьшается.

Поскольку вы не пытаетесь вытолкнуть что-либо из очереди до тех пор, пока не установлен семафор, и вы не настроите семафор до тех пор, пока вы не вытолкнете вещи в очередь, я думаю, это будет в безопасности. Согласно документации MSDN для класса очереди, если вы перечисляете очередь, а другой поток изменяет коллекцию, создается исключение. Ловите это исключение, и вы должны быть хорошими.

...