Представление асинхронных последовательностей в C # 5 - PullRequest
19 голосов
/ 03 октября 2011

Как вы должны использовать C # 5 async для представления последовательности асинхронных задач?Например, если мы хотим загрузить пронумерованные файлы с сервера и вернуть каждый из них по мере его получения, как мы можем реализовать такой метод?

public async IEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return await DownloadFile(string.Format(format, i));
    }
}

Ответы [ 5 ]

5 голосов
/ 04 октября 2011

Мне кажется, вы хотите что-то очень похожее на BlockingCollection<T>, которое использует Task s и await ing вместо блокировки.

В частности, то, что вы можете добавить, не блокируя и не ожидая. Но когда вы пытаетесь удалить элемент, когда в данный момент его нет, вы можете await, пока какой-либо элемент не будет доступен.

Публичный интерфейс может выглядеть так:

public class AsyncQueue<T>
{
    public bool IsCompleted { get; }

    public Task<T> DequeueAsync();

    public void Enqueue(T item);

    public void FinishAdding();
}

FinishAdding() необходимо, чтобы мы знали, когда закончить удаление из очереди.

При этом ваш код может выглядеть следующим образом (m_queue is AsyncQueue<File>):

var tasks = Enumerable.Range(0, 10)
    .Select(i => DownloadAndEnqueue(i))
    .ToArray();

Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());

…

static async Task DownloadAndEnqueue(string url)
{
    m_queue.Enqueue(await DownloadFile(url));
}

Это не так хорошо, как то, что вы себе представляли, может работать, но оно должно работать.

А реализация AsyncQueue<T>? Есть две очереди. Один для завершенной работы, который еще не снят с производства. Другой - для Task с (на самом деле, TaskCompletionSource<T>), которые были уже сняты с производства, но пока не дали никакого результата.

Когда вы удаляете очередь из очереди и в очереди есть выполненная работа, просто верните работу оттуда (используя Task.FromResult()). Если очередь пуста, создайте новую Task, добавьте ее в другую очередь и верните ее.

Когда вы ставите в очередь завершенную работу и в очереди есть Task, удалите одну и завершите ее, используя результат, который мы имеем сейчас. Если очередь Task пуста, добавьте работу в первую очередь.

При этом вы можете снимать и ставить в очередь столько раз, сколько захотите, и это будет работать правильно. Если вы знаете, что новой работы не будет, звоните FinishAdding(). Если есть какие-либо ожидающие Task с, они выдадут исключение.

Другими словами:

public class AsyncQueue<T>
{
    private readonly object m_lock = new object();

    private bool m_finishedAdding = false;

    private readonly Queue<T> m_overflowQueue = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
        new Queue<TaskCompletionSource<T>>();

    public bool IsCompleted
    {
        get { return m_finishedAdding && m_overflowQueue.Count == 0; }
    }

    public Task<T> DequeueAsync()
    {
        Task<T> result;
        lock (m_lock)
        {
            if (m_overflowQueue.Count > 0)
                result = Task.FromResult(m_overflowQueue.Dequeue());
            else if (!m_finishedAdding)
            {
                var tcs = new TaskCompletionSource<T>();
                m_underflowQueue.Enqueue(tcs);
                result = tcs.Task;
            }
            else
                throw new InvalidOperationException();
        }
        return result;
    }

    public void Enqueue(T item)
    {
        lock (m_lock)
        {
            if (m_finishedAdding)
                throw new InvalidOperationException();

            if (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetResult(item);
            }
            else
                m_overflowQueue.Enqueue(item);
        }
    }

    public void FinishAdding()
    {
        lock (m_lock)
        {
            m_finishedAdding = true;

            while (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetException(new InvalidOperationException());
            }
        }
    }
}

Если вы хотите ограничить размер рабочей очереди (и, следовательно, ограничить производителей, если они работают слишком быстро), вы можете также сделать Enqueue() return Task, что потребует другой очереди.

5 голосов
/ 03 октября 2011

Истинная последовательность не работает напрямую с async / await, потому что задачи возвращают только одно значение. Вам нужен фактический перечисляемый тип, такой как IAsyncEnumerator<T> в Ix-Async (или AsyncEx ). Дизайн IAsyncEnumerator<T> описан в этом канале 9 видео .

3 голосов
/ 05 сентября 2012

Я знаю, что это было давно, но я написал что-то, чтобы точно эмулировать "возвращаемую доходность" для асинхронных перечислимых значений здесь . Не требуется сложный код.

Вы используете это как:

public IAsyncEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    return AsyncEnumerable.Create(async y =>
    {
        for (int i = 0; i++; ) {
            await y.YieldReturn(await DownloadFile(string.Format(format, i)));
        }
    };
}

Я обычно избегаю рекламировать здесь свой собственный код, но это явно необходимая функция в C # 6.0, поэтому я надеюсь, что она окажется полезной в C # 5.0, если вы все еще застряли на этом.

0 голосов
/ 25 ноября 2014

Преимущество async заключается в том, что вызывающий метод может вызывать несколько операций блокировки параллельно и блокировать их только тогда, когда требуется возвращаемое значение. Такая же возможность возможна в этом сценарии с yield/return с использованием типа возврата IEnumerable<Task>.

public IEnumerable<Task<File>> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return DownloadFileAsync(string.Format(format, i));
    }
}

По аналогии с async/await вызывающий метод теперь может продолжать выполняться до тех пор, пока ему не понадобится следующее значение, при котором точка await/.Result может быть вызвана для следующей задачи. Следующий метод расширения демонстрирует это:

    public static IEnumerable<T> Results<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks)
            yield return task.Result;
    }

Если вызывающий метод хотел бы убедиться, что все IEnumerable Task созданы и работают параллельно, тогда может быть полезен такой метод расширения, как приведенный ниже (этот и вышеупомянутый метод, вероятно, уже стандартная библиотека):

    public static IEnumerable<T> ResultsParallel<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks.ToArray())
            yield return task.Result;
    }

Обратите внимание, как ответственность за то, что выполняется параллельно, передается вызывающему методу так же, как это было с async/await. В случае, если есть опасения относительно создания блокировки Task s, может быть создан метод расширения, такой как следующий:

    public static Task<IEnumerable<T>> ResultsAsync<T>(this IEnumerable<Task<T>> tasks)
    {
        var startedTasks = new ConcurrentQueue<Task<T>>();
        var writerTask = new Task(() =>
            {
                foreach (var task in tasks)
                {
                    startedTasks.Enqueue(task);
                }
            });
        writerTask.Start();

        var readerTask = new Task<IEnumerable<T>>(() =>
        {
            return ResultsSequential(startedTasks, () => writerTask.IsCompleted);
        });
        readerTask.Start();
        return readerTask;
    }

    private static IEnumerable<T> ResultsSequential<T>(ConcurrentQueue<Task<T>> tasks, Func<bool> isDone)
    {
        while (true)
        {
            Task<T> task;
            if (isDone.Invoke())
            {
                if (tasks.TryDequeue(out task))
                {
                    yield return task.Result;
                }
                else
                {
                    yield break;
                }
            } else if (tasks.TryDequeue(out task))
            {
                yield return task.Result;
            }
        }
    }

Эта реализация не очень эффективна. Эффективная реализация слишком велика, чтобы поместиться на полях .

0 голосов
/ 21 июня 2014

Если бы у вас было только конечное количество URL, вы могли бы сделать это:

    public async Task<IEnumerable<File>> DownloadPictures()
    {
        const string format = "http://example.com/files/{0}.png";
        var urls = Enumerable.Range(0, 999).Select(i => String.Format(format, i));
        var tasks = urls.Select(u => DownloadFile(u));
        var results = Task.WhenAll(tasks);
        return await results;
    }

Ключ в том, чтобы получить список задач, а затем вызвать Task.WhenAll в этом списке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...