Итерация IAsyncEnumerable в функции, возвращающей IAsyncEnumerable с отменой - PullRequest
3 голосов
/ 08 ноября 2019

Как гласит заголовок, мне нужно выполнить следующую функцию:

public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
        .AsAsyncEnumerable()
        .WithCancellation(cancellationToken)
        .ConfigureAwait(false))
    {
        yield return job;
    }
}

У меня проблемы с намоткой головы вокруг того места, где идет токен отмены, и неприятное ощущение, что я использую его в слишком многих местах.

Что на самом деле происходит здесь, когда вы деконструируете все причудливые асинхронные вещи? И есть ли лучшие способы написать эту функцию?

Ответы [ 2 ]

0 голосов
/ 08 ноября 2019

Представьте себе, что где-то глубоко внутри Entity Framework есть метод GetJobs, который извлекает Job объекты из базы данных:

private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    while (await dataReader.ReadAsync(cancellationToken))
    {
        yield return new Job()
        {
            Id = (int)dataReader["Id"],
            Data = (byte[])dataReader["Data"]
        };
    }
}

Теперь представьте, что свойство Data содержит огромный байтовый массивс данными, связанными с Job. Извлечение массива каждого Job может занять некоторое нетривиальное время. В этом случае разрыва цикла между итерациями будет недостаточно, поскольку будет заметная задержка между вызовом метода Cancel и повышением OperationCanceledException. Вот почему для метода DbDataReader.ReadAsync требуется CancellationToken, чтобы запрос можно было немедленно отменить.

Теперь задача состоит в том, как передать CancellationToken, переданный клиентом. код для метода GetJobs, когда свойство типа context.Jobs находится в процессе. Решением является метод расширения WithCancellation, который сохраняет токен и передает его глубже методу, принимающему аргумент, украшенный атрибутом EnumeratorCancellation.

Так что в вашем случае вы все сделали правильно. Вы включили аргумент cancellationToken в свой метод возврата IAsyncEnumerable, что является рекомендуемой практикой. Таким образом, последующий WithCancellation, связанный с вашим GetByPipeline методом, не будет потрачен впустую. Затем вы связали WithCancellation после AsAsyncEnumerable внутри вашего метода, что также правильно. В противном случае CancellationToken не достигнет конечного пункта назначения, метод GetJobs.

0 голосов
/ 08 ноября 2019

Для начала этот метод может быть уменьшен до:

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
{
    return context.Jobs
                  .Where(job => job.Pipeline.Id == pipelineId)
                  .AsAsyncEnumerable();
}

или даже

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
    => context.Jobs
              .Where(job => job.Pipeline.Id == pipelineId)
              .AsAsyncEnumerable();

Метод ничего не делает с job, поэтому он не нуженитерировать по нему.

Отмена

Что если метод на самом деле использовал job, где должен использоваться токен отмены?

Давайте очистимдо метода немного. Эквивалент:

public async IAsyncEnumerable<Job> GetByPipeline(
      int pipelineId, 
      [EnumeratorCancellation] CancellationToken ct = default)
{
    //Just a query, doesn't execute anything
    var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);

    //Executes the query and returns the *results* as soon as they arrive in an async stream
    var jobStream=query.AsAsyncEnumerable();

    //Process the results from the async stream as they arrive
    await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
    {
        //Does *that* need cancelling?
        DoSometingExpensive(job);
    }
}

IQueryable query ничего не выполняет, он представляет запрос. Это не нуждается в отмене.

AsAsyncEnumerable(), AsEnumerable(), ToList() и т. Д. выполнить запрос и вернуть некоторый результат. ToList() и т. Д. Потребляют все результаты, тогда как методы As...Enumerable() дают результаты только по запросу. Запрос не может быть отменен, методы As_Enumerable() не будут ничего возвращать, если его не попросят, поэтому они не требуют отмены.

await foreach будет выполнять итерацию по всему асинхронному потоку, поэтому, если мыЧтобы иметь возможность прервать его, нам нужно нужно передать маркер отмены.

Наконец, DoSometingExpensive(job); нужно отменить? Разве это так дорого, что мы хотим быть в состоянии вырваться из этого, если это займет слишком много времени? Или мы можем подождать, пока он не закончится, прежде чем выйти из цикла? Если требуется отмена, потребуется также CancellationToken.

ConfigureAwait

Наконец, ConfigureAwait(false) не участвует в отмене и может вообще не понадобиться,Без этого после каждого await выполнение возвращается в исходный контекст синхронизации. В настольном приложении это означало поток пользовательского интерфейса. Это то, что позволяет нам изменять пользовательский интерфейс в асинхронном обработчике событий.

Если GetByPipeline работает на настольном приложении и хочет изменить пользовательский интерфейс, ему придется удалить ConfugureAwait:

await foreach (var job in jobStream.WithCancellation(ct))
{
        //Update the UI
        toolStripProgressBar.Increment(1);
        toolStripStatusLabel.Text=job.Name;
        //Do the actual job
        DoSometingExpensive(job);
}

При ConfigureAwait(false) выполнение продолжается в пуле потоковпоток, и мы не можем коснуться пользовательского интерфейса.

Код библиотеки не должен влиять на возобновление выполнения, поэтому большинство библиотек используют ConfigureAwait(false) и оставляют окончательное решение разработчику UI.

Если GetByPipeline - это библиотечный метод, используйте ConfigureAwait(false).

...