Эквивалент Parallel.For localInit и localFinally для использования в блоках потока данных TPL, когда MaxDegreeOfParallelism> 1 - PullRequest
0 голосов
/ 01 февраля 2019

У меня есть TransformBlock<int, int>, который имеет MaxDegreeOfParallelism = 6.Я также определил, что Func<int, int>, который передается конструктору блока (который будет выполняться для каждого размещенного элемента), может быть логически разбит на дорогую процедуру инициализации и тело, которое мутирует локальные переменные функции.Было бы более эффективно, если бы я мог реорганизовать функцию в класс с именем TransformBlockState, выполнить инициализацию один раз для одновременного действия (точно так же как localInit обратный вызов *1006*) и затем разрешить поток данных TPL, чтобы убедиться, что состояниеникогда не видоизменяется более чем одним элементом за раз.

До рефакторинга:

Func<int, int> original = x => {
    // method local variables
    // expensive initialization routine to setup locals
    // perform action on local variables
    // potentially expensive teardown
}

После рефакторинга:

public sealed class TransformBlockState<TIn, TOut> : IDisposable
{
    // instance state

    public TransformBlockState()
    {
        // expensive initialization routine
    }

    public TOut Transform(TIn value)
    {
        // called many times but never concurrently for the same instance
    }

    public void Dispose()
    {
        // tear down state
    }
}

Что-то похожее на обратные вызовы localInit (для .ctor) и localFinally (для Dispose) уже существует в библиотеке потоков данных TPL?

Я хочу избежать использования ConcurrentStack<TransformBlockState> (много ненужных блокировок), и я хочу избежать сохранения TransformBlockState в поле [ThreadStatic] (поскольку нет гарантии, что Task не будет выполняться на нескольких потоках (последовательно, очевидно) или на нескольких Taskв одном потоке (возможно, все блокируются при вводе / выводе)).

Ответы [ 3 ]

0 голосов
/ 03 февраля 2019

Нет эквивалента loclaInit или localFinally.Вы можете создать аналогичное поведение с конвейером блоков или, возможно, использовать пул соединений, если это дорогостоящая инициализация.Но вам, возможно, придется пересмотреть свою проблему, и TPL-Dataflow может не подойти.Не зная больше о точной проблеме, решить ее сложно.Но обычно любую единственную инициализацию / для каждого ввода следует выполнять вне потока и передавать внутрь.

Но, как я уже говорил, вы можете использовать конвейер для получения чего-то вроде Parallel.Foreach, хотя на самом деле это может быть не то, что выИщите.

public class DataflowPipeline
{
    private TransformBlock<IEnumerable<int>, IEnumerable<Locals>> Initialize { get; }
    private TransformManyBlock<IEnumerable<Locals>, Locals> Distribute { get; }
    private TransformBlock<Locals, Result> Compute { get; }
    //other blocks, results, disposal etc.


    public DataflowPipeline()
    {
        var sequential = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 };
        var parallel = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 6 };

        Initialize = new TransformBlock<IEnumerable<int>, IEnumerable<Locals>>(
            inputs => inputs.Select(x => new Locals() { ExpensiveItem = string.Empty, Input = x }),
            sequential);
        Distribute = new TransformManyBlock<IEnumerable<Locals>, Locals>(x => x, sequential);
        Compute = new TransformBlock<Locals, Result>(
            local => new Result() { ExpensiveItem = local.ExpensiveItem, Output = local.Input * 2 },
            parallel);

        //Other blocks, link, complete etc.
    }
}
0 голосов
/ 13 февраля 2019

Я думаю, у меня есть лучший пример - мне нужно получить несколько тысяч записей о билетах от авиакомпаний (на самом деле GDS).Для этого мне нужно установить дорогой сеанс , прежде чем я смогу отправить запрос SOAP или REST.Сеансы ограничены, поэтому я действительно не хочу создавать новый для каждого билета.Это удваивает время, необходимое для запроса и тратит деньги и ресурсы.

Создание пользовательского блока может показаться решением, но на самом деле не это хорошо.Потоки данных устанавливают конвейеры блоков обработки, которые работают с потоком сообщений.Попытка заставить их работать по-другому столкнется с фундаментальными допущениями модели потока данных.

Например, задачи используются для параллелизма, регулирования и распределения нагрузки - опция MaxMessagesPerTask убивает задачи после получения максимального количества сообщений, чтобы одна задача не загружала процессор длядолго.Создание и уничтожение сеансов для каждой задачи может привести к поломке этого механизма. и в итоге создадут больше сеансов, чем необходимо.

Объединение в пул

Один из способов справиться с этим,заключается в использовании пула объектов, снабженного «дорогими» объектами, которые будут использоваться блоками, в данном случае Sessions.Как ни странно, пакет Microsoft.Extensions.ObjectPool предлагает именно такой пул.Документы не существуют , они обманчиво помещаются в дерево ASP.NET, но это отдельный пакет .NET Standard 2.0.Github source обманчиво прост, и класс использует Interlocked.CompareExchange, чтобы избежать блокировки.Есть даже реализация LeakTrackingObjectPool.

Если бы я знал об этом в прошлом, я мог бы написать:

var pool = new DefaultObjectPool<Session>(new DefaultPooledObjectPolicy<Session>());

Политика DefaultPooledObjectPolicy просто использует new длясоздать новый экземпляр.Однако легко создать новую политику, например такую, которая использует собственную логику создания или даже фабричный метод:

public class SessionPolicy : DefaultPooledObjectPolicy<Session>
{
    public override Session Create()
    {
        //Do whatever is needed here
        return session;
    }
}

Перенаправление

Другой вариант заключается в использовании несколько блоков экземпляров и исходный блок для ссылки на все из них.Чтобы избежать отправки всех сообщений в первый блок, необходима ограниченная емкость.Допустим, у нас есть этот фабричный метод:

TransformBlock<TIn,TOut> CreateThatBlockWithSession<TIn,TOut>(Settings someSettings)
{
    var session=CreateSomeSessionFrom(someSettings);
    var bounded=new DataflowBlockOptions {BoundedCapacity =1};
    return new TransformBlock<TIn,TOut>(msg=>FunctionThatUses(msg,session),bounded);
}

И мы используем его для создания нескольких блоков:

_blocks=Enumerable.Range(0,10)
                  .Select(_=>CreateThatBlockWithSession(settings))
                  .ToArray();

Исходный блок может подключаться ко всем этим блокам:

foreach(var target in _blocks)
{
    _source.LinkTo(target,options);
}

А затем свяжите все эти блоки со следующим блоком.Сложность в том, что мы не можем просто распространять завершение.Если один из блоков завершен, он заставит следующий блок завершиться, даже если в других блоках ожидают сообщения.

Решение состоит в том, чтобы использовать Task.WhenAll и ContinueWith для передачи завершения доследующий блок:

foreach(var target in _blocks)
{
    target.LinkTo(_nextBlock);
}

var allTasks=_blocks.Select(blk=>blk.Completion);
Task.WhenAll(allTasks)
    .ContinueWith(_=>_nextBlock.Complete());

Более надежная реализация будет проверять состояние всех задач IsFaulted и вызывать Fault() в следующем блоке, если одна из них не будет выполнена

0 голосов
/ 01 февраля 2019

Если вы хотите иметь блок с состоянием, TransformBlock (или ActionBlock), вы создаете функцию, которая создает блок, помещает состояние в локальные переменные и захватывает их:

private IPropagatorBlock<int,int> CreateMyBlock()
{
    var state = 0;
    return new TransformBlock<int,int>( x => x+state++ );
}

Таким образом, ваш класс неявно создается компилятором.

...