TPL DataFlow с Lazy Source / потоком данных - PullRequest
0 голосов
/ 28 сентября 2018

Предположим, у вас есть TransformBlock с настроенным параллелизмом и вы хотите передавать данные через блок.Входные данные должны создаваться только тогда, когда конвейер действительно может начать их обработку.(И должен быть освобожден, как только он покинет трубопровод.)

Могу ли я достичь этого?И если да, то как?

В основном я хочу источник данных, который работает как итератор.Вот так:

public IEnumerable<Guid> GetSourceData()
{
    //In reality -> this should also be an async task -> but yield return does not work in combination with async/await ...
    Func<ICollection<Guid>> GetNextBatch = () => Enumerable.Repeat(100).Select(x => Guid.NewGuid()).ToArray();

    while (true)
    {
        var batch = GetNextBatch();
        if (batch == null || !batch.Any()) break;
        foreach (var guid in batch)
            yield return guid;
    }
}

Это приведет к + - 100 записей в памяти.ОК: больше, если блоки, которые вы добавляете в этот источник данных, будут некоторое время удерживать их в памяти, но у вас есть шанс получить только подмножество (/ поток) данных.


Немного справочной информации:

Я намереваюсь использовать это в сочетании с лазурной базой данных космоса, где источником могут быть все объекты в коллекции или фид изменений.Излишне говорить, что я не хочу, чтобы все эти объекты хранились в памяти.Так что это не может работать:

using System.Threading.Tasks.Dataflow;

public async Task ExampleTask()
{
    Func<Guid, object> TheActualAction = text => text.ToString();

    var config = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 5,
        MaxDegreeOfParallelism = 15
    };
    var throtteler = new TransformBlock<Guid, object>(TheActualAction, config);
    var output = new BufferBlock<object>();
    throtteler.LinkTo(output);

    throtteler.Post(Guid.NewGuid());
    throtteler.Post(Guid.NewGuid());
    throtteler.Post(Guid.NewGuid());
    throtteler.Post(Guid.NewGuid());
    //...
    throtteler.Complete();

    await throtteler.Completion;
}

Приведенный выше пример не годится, потому что я добавляю все элементы, не зная, действительно ли они «используются» блоком преобразования.Кроме того, меня не волнует выходной буфер.Я понимаю, что мне нужно отправить его куда-нибудь, чтобы я мог дождаться завершения, но после этого я не буду использовать буфер.Так что стоит просто забыть обо всем, что у него получается ...

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

PostAsync вернет false, если цель заполнена без блокировки.Хотя это может использоваться в цикле ожидания ожидания, это расточительно.SendAsync с другой стороны будет ждать, если цель заполнена:

public async Task ExampleTask()
{
    var config = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 50,
        MaxDegreeOfParallelism = 15
    };
    var block= new ActionBlock<Guid, object>(TheActualAction, config);

    while(//some condition//)
    { 
        var data=await GetDataFromCosmosDB();
        await block.SendAsync(data);
        //Wait a bit if we want to use polling
        await Task.Delay(...);
    }

    block.Complete();
    await block.Completion;
}
0 голосов
/ 28 сентября 2018

Кажется, вы хотите обрабатывать данные с определенной степенью параллелизма (MaxDegreeOfParallelism = 15).Поток данных TPL очень неудобен в использовании для такого простого требования.

Существует очень простой и мощный шаблон, который может решить вашу проблему.Это параллельный асинхронный цикл foreach, как описано здесь: https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

Затем вы можете написать что-то вроде:

var dataSource = ...; //some sequence
dataSource.ForEachAsync(15, async item => await ProcessItem(item));

Очень просто.

Вы можете динамическиуменьшить DOP с помощью SemaphoreSlim.Семафор выступает в качестве шлюза, который позволяет динамически изменять только N одновременных потоков / задач. N 1014 *

Таким образом, вы бы использовали ForEachAsync в качестве базовой рабочей лошадки, а затем добавили дополнительные ограничения и регулирование сверху.

...