Запретить запуск задачи при добавлении в параллельную очередь - PullRequest
0 голосов
/ 09 июля 2020

Я пытаюсь добавить свою задачу в настраиваемую параллельную очередь, но она продолжает запускаться. Как я могу просто добавить объект задачи в очередь, не запуская его, чтобы я мог запустить его позже в коде? В основном то, что он должен делать для каждого фрагмента, - это получить поток запросов, передать его в файл и одновременно запустить следующий фрагмент.

Моя настраиваемая параллельная очередь:

public sealed class EventfulConcurrentQueue<T> : ConcurrentQueue<T>
{
    public ConcurrentQueue<T> Queue;

    public EventfulConcurrentQueue()
    {
        Queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        Queue.Enqueue(item);
        OnItemEnqueued();
    }

    public int Count => Queue.Count;


    public bool TryDequeue(out T result)
    {
        var success = Queue.TryDequeue(out result);

        if (success)
        {
            OnItemDequeued(result);
        }
        return success;
    }

    public event EventHandler ItemEnqueued;
    public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;

    void OnItemEnqueued()
    {
        ItemEnqueued?.Invoke(this, EventArgs.Empty);
    }

    void OnItemDequeued(T item)
    {
        ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
    }
}

public sealed class ItemDequeuedEventArgs<T> : EventArgs
{
    public T Item { get; set; }
}

Код I Я использую для добавления задачи в Очередь:

Parallel.ForEach(pieces, piece =>
{
    //Open a http request with the range
    var request = new HttpRequestMessage { RequestUri = new Uri(url) };
    request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);

    //Send the request
    var downloadTask = client.SendAsync(request).Result;

    //Use interlocked to increment Tasks done by one
    Interlocked.Add(ref OctaneEngine.TasksDone, 1);

    //Add the task to the queue along with the start and end value
    asyncTasks.Enqueue(new Tuple<Task, FileChunk>(
        downloadTask.Content.ReadAsStreamAsync().ContinueWith(
        task =>
        {
            using (var fs = new FileStream(piece._tempfilename,
                FileMode.OpenOrCreate, FileAccess.Write))
            {
                task.Result.CopyTo(fs);
            }
        }), piece));
});

Код, который я использую для последующего запуска задач:

Parallel.ForEach(asyncTasks.Queue, async (task, state) =>
{
    if (asyncTasks.Count > 0)
    {
        await Task.Run(() => task);
        asyncTasks.TryDequeue(out task);
        Interlocked.Add(ref TasksDone, 1);
    }
});

Я не уверен, что происходит, поэтому любая помощь будет принята с благодарностью! Спасибо!

1 Ответ

1 голос
/ 09 июля 2020

Мне кажется, что вы здесь слишком усложняете.

Я не уверен, что вам вообще нужна какая-либо очередь.

Этот подход позволяет работать с доступом к сети одновременно:

using var semaphore = new SemaphoreSlim(1);
var tasks = pieces.Select(piece =>
{
    var request = new HttpRequestMessage { RequestUri = new Uri(url) };
    request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);

    //Send the request
    var download = await client.SendAsync(request);

    //Use interlocked to increment Tasks done by one
    Interlocked.Increment(ref OctaneEngine.TasksDone);

    var stream = await download.Content.ReadAsStreamAsync();
    
    using (var fs = new FileStream(piece._tempfilename, FileMode.OpenOrCreate,
        FileAccess.Write))
    {
        await semaphore.WaitAsync(); // Only allow one concurrent file write
        await stream.CopyToAsync(fs);
    }

    semaphore.Release();

    Interlocked.Increment(ref TasksDone);
});

await Task.WhenAll(tasks);

Поскольку ваша работа связана с вводом-выводом, попытки использовать несколько потоков через Parallel.ForEach не принесут особой пользы; ожидание методов async освободит потоки для обработки других запросов.

...