Очередь доступа <T>от двух делегатов. Асинхронные методы BeginInvoke одновременно - PullRequest
0 голосов
/ 17 января 2012

У меня нет опыта работы с потоками и асинхронной обработкой, поэтому я не уверен, что то, что я делаю, безопасно.

ОБНОВЛЕНИЕ: Это, к сожалению, ограничено .Net 3.5, поэтому нет ConcurrentQueue (разве кто-то реализовал его для .Net 3.5 или не знает альтернативную реализацию?)

ОБНОВЛЕНИЕ 2: Я нашел этот подход, использующий исходный код из реализации Mono ConcurrentQueue (который является совершенно совместимым кодом для .Net 3.5).

У меня есть один объект, задачей которого является внутреннее управление Queue<MyObject>. Этот класс должен выполнять свои операции асинхронно (он будет работать непрерывно между событиями OnStart и OnStop службы Windows). В любом случае, я позволю коду говорить:

public class MyObjectQueueManager : IMyObjectQueueManager
{
    private bool shouldContinueEnqueuing;
    private readonly Queue<MyObject> queue;
    private readonly IMyObjectIdentifier myObjectIdentifier;

    public MyObjectQueueManager( IMyObjectIdentifier myObjectIdentifier )
    {
        this.myObjectIdentifier = myObjectIdentifier;
        queue = new Queue<MyObject>();
    }

    public void StartQueue()
    {
        shouldContinueEnqueuing = true;

        var enqueuer = new MyObjectEnqueuer( EnqueueMyObjects );
        enqueuer.BeginInvoke( myObjectIdentifier, queue, StopEnqueuingMyObjects, new object() );

        var dequeuer = new MyObjectDequeuer( DequeueMyObjects );
        dequeuer.BeginInvoke( queue, StopDequeuingMyObjects, new object() );
    }

    public void StopQueue()
    {
        shouldContinueEnqueuing = false;
    }

    public event EventHandler<NextMyObjectEventArgs> OnNextMyObjectAvailable;

    private void EnqueueMyObjects( IMyObjectIdentifier identifier, Queue<MyObject> queue )
    {
        while ( shouldContinueEnqueuing )
        {
            var myObjects = identifier.GetMyObjects();

            foreach ( var myObject in myObjects )
            {
                queue.Enqueue( myObject );
            }

            WaitForQueueToShrink( queue, 1000 /* arbitrary queue limiter - will come from settings. */ );
        }
    }

    private void DequeueMyObjects( Queue<MyObject> queue )
    {
        while ( queue.Count > 0 || shouldContinueEnqueuing )
        {
            if ( queue.Count > 0 )
            {
                OnNextMyObjectAvailable( this, new NextMyObjectEventArgs( queue.Dequeue() ) );
            }

            Thread.Sleep( 1 );
        }
    }

    private static void WaitForQueueToShrink( ICollection queue, int queueLimit )
    {
        while ( queue.Count > queueLimit )
        {
            Thread.Sleep( 10 );
        }
    }

    private static void StopEnqueuingMyObjects( IAsyncResult result )
    {
        var asyncResult = ( AsyncResult ) result;
        var x = ( MyObjectEnqueuer ) asyncResult.AsyncDelegate;

        x.EndInvoke( asyncResult );
    }

    private static void StopDequeuingMyObjects( IAsyncResult result )
    {
        var asyncResult = ( AsyncResult ) result;
        var x = ( MyObjectDequeuer ) asyncResult.AsyncDelegate;

        x.EndInvoke( asyncResult );
    }

    private delegate void MyObjectEnqueuer( IMyObjectIdentifier myObjectIdentifier, Queue<MyObject> queue );

    private delegate void MyObjectDequeuer( Queue<MyObject> queue );
}

Мои заметки / мысли / проблемы:

  • Если я правильно понимаю Delegate.BeginInvoke, я порождаю два потока, и оба обращаются к Очереди (через параметры методов вызова).
  • Одна нить ставит в очередь только элементы.
  • Один поток только удаляет элементы из очереди, и он всегда проверяет, что элементы находятся в очереди первыми.
  • Я не уверен, что это потокобезопасно - будет ли очередь заблокирована одним потоком и не будет доступна из другого потока?
  • Если это так, какой альтернативный подход / метод я мог бы использовать, который позволил бы мне независимо ставить в очередь / удалять элементы без удержания основного рабочего потока?
  • Если этот подход действительно как-то обоснован (вы никогда не знаете), будет ли он масштабироваться для обработки большой громкости? То есть если IMyObjectIdentifier возвращает тысячи элементов в секунду, сработает ли сценарий (конечно, я буду выполнять объемное тестирование для подтверждения).

В качестве бонуса я заметил, что есть аналогичный код, используемый для вызова EndInvoke из методов StopEnqueuingMyObjects и StopDequeuingMyObjects - я бы хотел рефакторинг для использования одного метода, если смогу, но когда я попытался использовать универсальный метод как это:

private static void StopProcessingMyObjects<T>( IAsyncResult result )
{
    var asyncResult = ( AsyncResult ) result;
    var x = ( T ) asyncResult.AsyncDelegate;

    x.EndInvoke( asyncResult ); 
}

Ему это не нравится, потому что он не знает, что T является delgate, и поэтому EndInvoke не может быть вызван. Возможно ли это сделать?

1 Ответ

3 голосов
/ 17 января 2012

Попробуйте ConcurrentQueue<T>. Это потокобезопасная реализация очереди. API немного отличается, поэтому для его реализации может потребоваться некоторая модификация текущего кода.

...