Мне кажется, вы хотите что-то очень похожее на BlockingCollection<T>
, которое использует Task
s и await
ing вместо блокировки.
В частности, то, что вы можете добавить, не блокируя и не ожидая. Но когда вы пытаетесь удалить элемент, когда в данный момент его нет, вы можете await
, пока какой-либо элемент не будет доступен.
Публичный интерфейс может выглядеть так:
public class AsyncQueue<T>
{
public bool IsCompleted { get; }
public Task<T> DequeueAsync();
public void Enqueue(T item);
public void FinishAdding();
}
FinishAdding()
необходимо, чтобы мы знали, когда закончить удаление из очереди.
При этом ваш код может выглядеть следующим образом (m_queue
is AsyncQueue<File>
):
var tasks = Enumerable.Range(0, 10)
.Select(i => DownloadAndEnqueue(i))
.ToArray();
Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());
…
static async Task DownloadAndEnqueue(string url)
{
m_queue.Enqueue(await DownloadFile(url));
}
Это не так хорошо, как то, что вы себе представляли, может работать, но оно должно работать.
А реализация AsyncQueue<T>
? Есть две очереди. Один для завершенной работы, который еще не снят с производства. Другой - для Task
с (на самом деле, TaskCompletionSource<T>
), которые были уже сняты с производства, но пока не дали никакого результата.
Когда вы удаляете очередь из очереди и в очереди есть выполненная работа, просто верните работу оттуда (используя Task.FromResult()
). Если очередь пуста, создайте новую Task
, добавьте ее в другую очередь и верните ее.
Когда вы ставите в очередь завершенную работу и в очереди есть Task
, удалите одну и завершите ее, используя результат, который мы имеем сейчас. Если очередь Task
пуста, добавьте работу в первую очередь.
При этом вы можете снимать и ставить в очередь столько раз, сколько захотите, и это будет работать правильно. Если вы знаете, что новой работы не будет, звоните FinishAdding()
. Если есть какие-либо ожидающие Task
с, они выдадут исключение.
Другими словами:
public class AsyncQueue<T>
{
private readonly object m_lock = new object();
private bool m_finishedAdding = false;
private readonly Queue<T> m_overflowQueue = new Queue<T>();
private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
new Queue<TaskCompletionSource<T>>();
public bool IsCompleted
{
get { return m_finishedAdding && m_overflowQueue.Count == 0; }
}
public Task<T> DequeueAsync()
{
Task<T> result;
lock (m_lock)
{
if (m_overflowQueue.Count > 0)
result = Task.FromResult(m_overflowQueue.Dequeue());
else if (!m_finishedAdding)
{
var tcs = new TaskCompletionSource<T>();
m_underflowQueue.Enqueue(tcs);
result = tcs.Task;
}
else
throw new InvalidOperationException();
}
return result;
}
public void Enqueue(T item)
{
lock (m_lock)
{
if (m_finishedAdding)
throw new InvalidOperationException();
if (m_underflowQueue.Count > 0)
{
var tcs = m_underflowQueue.Dequeue();
tcs.SetResult(item);
}
else
m_overflowQueue.Enqueue(item);
}
}
public void FinishAdding()
{
lock (m_lock)
{
m_finishedAdding = true;
while (m_underflowQueue.Count > 0)
{
var tcs = m_underflowQueue.Dequeue();
tcs.SetException(new InvalidOperationException());
}
}
}
}
Если вы хотите ограничить размер рабочей очереди (и, следовательно, ограничить производителей, если они работают слишком быстро), вы можете также сделать Enqueue()
return Task
, что потребует другой очереди.