Потоковая передача данных через IEnumerable & TPL Dataflow - PullRequest
1 голос
/ 25 октября 2019

Я получаю элементы из вышестоящего API, который довольно медленный. Я пытаюсь ускорить это с помощью потока данных TPL для создания нескольких соединений и их объединения, например:

class Stuff
{
    int Id { get; }
}

async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
    var bagOfStuff = new ConcurrentBag<Stuff>();

    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        bagOfStuff.Add(await GetStuffById(id));
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    await processor.Completion;

    return bagOfStuff.ToArray();
}

Проблема заключается в том, что мне нужно подождать, пока я закончу запрашивать всю коллекцию Stuff прежде чем я смогу вернуть его звонящему. Я предпочел бы, чтобы всякий раз, когда любой из нескольких параллельных запросов возвращал элемент, я возвращал этот элемент yield return. Поэтому мне не нужно возвращать sync Task<IEnumerable<Stuff>>, я могу просто вернуть IEnumerable<Stuff>, и вызывающая сторона выполняет итерацию, как только возвращаются какие-либо элементы.

Я попытался сделать это так;

IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        yield return await GetStuffById(id);
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    processor.Completion.Wait();

    yield break;
}

Но я получаю ошибку

Оператор yield не может использоваться внутри анонимного метода или лямбда-выражения

Как я могу реструктурировать свой код?

Ответы [ 2 ]

0 голосов
/ 25 октября 2019

Вы можете вернуть IEnumerable, но для этого вы должны заблокировать ваш текущий поток. Вам необходим TransformBlock для обработки идентификаторов и задача-фидер, которая асинхронно передает TransformBlock с идентификаторами. Наконец, текущий поток войдет в цикл блокировки, ожидая, когда произведенный материал даст:

static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    var processor = new TransformBlock<int, Stuff>(async id =>
    {
        return await GetStuffById(id);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 50, // Avoid buffering millions of ids
    });

    var completionCTS = new CancellationTokenSource();
    var feederTask = Task.Run(async () =>
    {
        foreach (int id in ids)
        {
            if (completionCTS.IsCancellationRequested) break;
            await processor.SendAsync(id).ConfigureAwait(false);
        }
        processor.Complete();
    });

    try
    {
        while (processor.OutputAvailableAsync().Result)
        {
            while (processor.TryReceive(out var stuff))
            {
                yield return stuff;
            }
        }
        feederTask.Wait(); // To propagate exceptions
        processor.Completion.Wait(); // To propagate exceptions
    }
    finally // This runs when the caller completes the enumeration
    {
        completionCTS.Cancel();
    }
}

Нет необходимости ConcurrentBag, поскольку TransformBlock имеет внутренний выходной буфер. Сложная часть имеет дело со случаем, когда вызывающая сторона откажется от перечисления IEnumerable<Stuff>, сделав ранний разрыв или будучи заблокирована исключением. В этом случае вы не хотите, чтобы задача-фидер продолжала качать IEnumerable<int> с идентификаторами до конца. К счастью есть решение . Заключение цикла уступки в блок try / finally позволяет получить уведомление об этом событии, что позволяет своевременно завершить задачу-фидер.

Альтернативная реализация может устранить необходимость в фидере-задача, объединяя прокачку идентификаторов, кормление блока и сдачу материала в одну петлю. В этом случае вы бы хотели отставание между накачкой и урожайностью. Для этого может быть полезен метод расширения MoreLinq Lag (или Lead).


Обновление: Вот другая реализация, которая перечисляет и возвращает в одном и том же цикле. Для достижения желаемого отставания перечисляемый источник дополняется справа некоторыми фиктивными элементами, число которых равно степени параллелизма.

Эта реализация принимает универсальные типы вместо int и Stuff.

public static IEnumerable<TResult> Transform<TSource, TResult>(
    IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
    int degreeOfConcurrency)
{
    var processor = new TransformBlock<TSource, TResult>(async item =>
    {
        return await taskFactory(item);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = degreeOfConcurrency
    });

    var paddedSource = source.Select(item => (item, true))
        .Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
    int index = -1;
    bool completed = false;
    foreach (var (item, hasValue) in paddedSource)
    {
        index++;
        if (hasValue) { processor.Post(item); }
        else if (!completed) { processor.Complete(); completed = true; }
        if (index >= degreeOfConcurrency)
        {
            if (!processor.OutputAvailableAsync().Result) break; // Blocking call
            if (!processor.TryReceive(out var result))
                throw new InvalidOperationException(); // Should never happen
            yield return result;
        }
    }
    processor.Completion.Wait();
}

Пример использования:

IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);

Обе реализации можно тривиально изменить, чтобы они возвращали IAsyncEnumerable вместо IEnumerable, чтобы избежать блокировки вызывающего потока.

0 голосов
/ 25 октября 2019

Вероятно, есть несколько разных способов справиться с этим в зависимости от вашего конкретного случая использования. Но чтобы обрабатывать элементы по мере их поступления в терминах потока данных TPL, вы должны изменить исходный блок на TransformBlock<,> и передать элементы в другой блок для обработки ваших элементов. Обратите внимание, что теперь вы можете избавиться от сбора ConcurrentBag и обязательно установить EnsureOrdered в false, если вам все равно, в каком порядке вы получаете ваши предметы. Также связывайте блоки и распространяйте завершение, чтобы гарантировать, что ваш конвейер завершитсякак только все элементы будут извлечены и впоследствии обработаны.

class Stuff
{
    int Id { get; }
}

public class GetStuff
{
    async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

    async Task GetLotsOfStuff(IEnumerable<int> ids)
    {
        //var bagOfStuff = new ConcurrentBag<Stuff>();

        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            EnsureOrdered = false
        };

        var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);

        var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());

        processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });

        foreach (int id in ids)
        {
            processor.Post(id);
        }

        processor.Complete();
        await handler.Completion;
    }
}

Другими вариантами может быть превращение вашего метода в наблюдаемую потоковую передачу из TransformBlock или использование от IAsyncEnumerable до yield return и асинхронный метод получения.

...