Я пытаюсь добавить свою задачу в настраиваемую параллельную очередь, но она продолжает запускаться. Как я могу просто добавить объект задачи в очередь, не запуская его, чтобы я мог запустить его позже в коде? В основном то, что он должен делать для каждого фрагмента, - это получить поток запросов, передать его в файл и одновременно запустить следующий фрагмент.
Моя настраиваемая параллельная очередь:
public sealed class EventfulConcurrentQueue<T> : ConcurrentQueue<T>
{
public ConcurrentQueue<T> Queue;
public EventfulConcurrentQueue()
{
Queue = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
Queue.Enqueue(item);
OnItemEnqueued();
}
public int Count => Queue.Count;
public bool TryDequeue(out T result)
{
var success = Queue.TryDequeue(out result);
if (success)
{
OnItemDequeued(result);
}
return success;
}
public event EventHandler ItemEnqueued;
public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;
void OnItemEnqueued()
{
ItemEnqueued?.Invoke(this, EventArgs.Empty);
}
void OnItemDequeued(T item)
{
ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
}
}
public sealed class ItemDequeuedEventArgs<T> : EventArgs
{
public T Item { get; set; }
}
Код I Я использую для добавления задачи в Очередь:
Parallel.ForEach(pieces, piece =>
{
//Open a http request with the range
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);
//Send the request
var downloadTask = client.SendAsync(request).Result;
//Use interlocked to increment Tasks done by one
Interlocked.Add(ref OctaneEngine.TasksDone, 1);
//Add the task to the queue along with the start and end value
asyncTasks.Enqueue(new Tuple<Task, FileChunk>(
downloadTask.Content.ReadAsStreamAsync().ContinueWith(
task =>
{
using (var fs = new FileStream(piece._tempfilename,
FileMode.OpenOrCreate, FileAccess.Write))
{
task.Result.CopyTo(fs);
}
}), piece));
});
Код, который я использую для последующего запуска задач:
Parallel.ForEach(asyncTasks.Queue, async (task, state) =>
{
if (asyncTasks.Count > 0)
{
await Task.Run(() => task);
asyncTasks.TryDequeue(out task);
Interlocked.Add(ref TasksDone, 1);
}
});
Я не уверен, что происходит, поэтому любая помощь будет принята с благодарностью! Спасибо!