У меня нет опыта работы с потоками и асинхронной обработкой, поэтому я не уверен, что то, что я делаю, безопасно.
ОБНОВЛЕНИЕ: Это, к сожалению, ограничено .Net 3.5, поэтому нет ConcurrentQueue (разве кто-то реализовал его для .Net 3.5 или не знает альтернативную реализацию?)
ОБНОВЛЕНИЕ 2: Я нашел этот подход, использующий исходный код из реализации Mono ConcurrentQueue (который является совершенно совместимым кодом для .Net 3.5).
У меня есть один объект, задачей которого является внутреннее управление Queue<MyObject>
. Этот класс должен выполнять свои операции асинхронно (он будет работать непрерывно между событиями OnStart и OnStop службы Windows). В любом случае, я позволю коду говорить:
public class MyObjectQueueManager : IMyObjectQueueManager
{
private bool shouldContinueEnqueuing;
private readonly Queue<MyObject> queue;
private readonly IMyObjectIdentifier myObjectIdentifier;
public MyObjectQueueManager( IMyObjectIdentifier myObjectIdentifier )
{
this.myObjectIdentifier = myObjectIdentifier;
queue = new Queue<MyObject>();
}
public void StartQueue()
{
shouldContinueEnqueuing = true;
var enqueuer = new MyObjectEnqueuer( EnqueueMyObjects );
enqueuer.BeginInvoke( myObjectIdentifier, queue, StopEnqueuingMyObjects, new object() );
var dequeuer = new MyObjectDequeuer( DequeueMyObjects );
dequeuer.BeginInvoke( queue, StopDequeuingMyObjects, new object() );
}
public void StopQueue()
{
shouldContinueEnqueuing = false;
}
public event EventHandler<NextMyObjectEventArgs> OnNextMyObjectAvailable;
private void EnqueueMyObjects( IMyObjectIdentifier identifier, Queue<MyObject> queue )
{
while ( shouldContinueEnqueuing )
{
var myObjects = identifier.GetMyObjects();
foreach ( var myObject in myObjects )
{
queue.Enqueue( myObject );
}
WaitForQueueToShrink( queue, 1000 /* arbitrary queue limiter - will come from settings. */ );
}
}
private void DequeueMyObjects( Queue<MyObject> queue )
{
while ( queue.Count > 0 || shouldContinueEnqueuing )
{
if ( queue.Count > 0 )
{
OnNextMyObjectAvailable( this, new NextMyObjectEventArgs( queue.Dequeue() ) );
}
Thread.Sleep( 1 );
}
}
private static void WaitForQueueToShrink( ICollection queue, int queueLimit )
{
while ( queue.Count > queueLimit )
{
Thread.Sleep( 10 );
}
}
private static void StopEnqueuingMyObjects( IAsyncResult result )
{
var asyncResult = ( AsyncResult ) result;
var x = ( MyObjectEnqueuer ) asyncResult.AsyncDelegate;
x.EndInvoke( asyncResult );
}
private static void StopDequeuingMyObjects( IAsyncResult result )
{
var asyncResult = ( AsyncResult ) result;
var x = ( MyObjectDequeuer ) asyncResult.AsyncDelegate;
x.EndInvoke( asyncResult );
}
private delegate void MyObjectEnqueuer( IMyObjectIdentifier myObjectIdentifier, Queue<MyObject> queue );
private delegate void MyObjectDequeuer( Queue<MyObject> queue );
}
Мои заметки / мысли / проблемы:
- Если я правильно понимаю Delegate.BeginInvoke, я порождаю два потока, и оба обращаются к Очереди (через параметры методов вызова).
- Одна нить ставит в очередь только элементы.
- Один поток только удаляет элементы из очереди, и он всегда проверяет, что элементы находятся в очереди первыми.
- Я не уверен, что это потокобезопасно - будет ли очередь заблокирована одним потоком и не будет доступна из другого потока?
- Если это так, какой альтернативный подход / метод я мог бы использовать, который позволил бы мне независимо ставить в очередь / удалять элементы без удержания основного рабочего потока?
- Если этот подход действительно как-то обоснован (вы никогда не знаете), будет ли он масштабироваться для обработки большой громкости? То есть если IMyObjectIdentifier возвращает тысячи элементов в секунду, сработает ли сценарий (конечно, я буду выполнять объемное тестирование для подтверждения).
В качестве бонуса я заметил, что есть аналогичный код, используемый для вызова EndInvoke из методов StopEnqueuingMyObjects и StopDequeuingMyObjects - я бы хотел рефакторинг для использования одного метода, если смогу, но когда я попытался использовать универсальный метод как это:
private static void StopProcessingMyObjects<T>( IAsyncResult result )
{
var asyncResult = ( AsyncResult ) result;
var x = ( T ) asyncResult.AsyncDelegate;
x.EndInvoke( asyncResult );
}
Ему это не нравится, потому что он не знает, что T
является delgate, и поэтому EndInvoke
не может быть вызван. Возможно ли это сделать?