Как правильно управлять Завершение в потоке данных TPL - PullRequest
4 голосов
/ 24 марта 2019

Я создал нечто похожее на веб-сканер для создания отчета о 1000+ веб-сервисах, которыми я должен управлять.Поэтому я создал TPL Dataflow Pipeline для управления получением и обработкой данных.Конвейер, который я представлял, выглядит примерно так (извините за мои навыки рисования: D): The Pipeline

Я уже создал реализацию, и все работало нормально, пока я не запустил свой конвейер какцелое.Я передал 500 объектов в конвейер в качестве входных данных в конвейер и ожидал, что программа будет работать некоторое время, но программа остановилась после перехода в блок выполнения.После проверки хода выполнения программы мне показалось, что Завершение быстро распространяется к Блоку утилизации.Я создал небольшой пример проекта с тем же конвейером, чтобы проверить, была ли это моя реализация классов ввода или самого конвейера.Пример кода:

public class Job
{
    public int Ticker { get; set; }

    public Type Type { get; }

    public Job(Type type)
    {
        Type = type;
    }

    public Task Prepare()
    {
        Console.WriteLine("Preparing");
        Ticker = 0;
        return Task.CompletedTask;
    }

    public Task Tick()
    {
        Console.WriteLine("Ticking");
        Ticker++;
        return Task.CompletedTask;
    }

    public bool IsCommitable()
    {
        Console.WriteLine("Trying to commit");
        return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
    }

    public bool IsFinished()
    {
        Console.WriteLine("Trying to finish");
        return Ticker == 1000000;
    }

    public void IntermediateCleanUp()
    {
        Console.WriteLine("intermediate Cleanup");
        Ticker = Ticker - 120;
    }

    public void finalCleanUp()
    {
        Console.WriteLine("Final Cleanup");
        Ticker = -1;
    }
}

Это мой класс ввода, который введен в блок подготовки.

public class Dataflow
{
    private TransformBlock<Job, Job> _preparationsBlock;

    private BufferBlock<Job> _balancerBlock;

    private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 4
    };

    private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    private TransformBlock<Job, Job> _typeATickBlock;

    private TransformBlock<Job, Job> _typeBTickBlock;

    private TransformBlock<Job, Job> _writeBlock;

    private TransformBlock<Job, Job> _intermediateCleanupBlock;

    private ActionBlock<Job> _finalCleanupBlock;

    public async Task Process()
    {
        CreateBlocks();

        ConfigureBlocks();

        for (int i = 0; i < 500; i++)
        {
            await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
        }
        _preparationsBlock.Complete();

        await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
    }

    private void CreateBlocks()
    {
        _preparationsBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Prepare();
            return job;
        }, _options);

        _balancerBlock = new BufferBlock<Job>(_options);

        _typeATickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            return job;
        }, _options);

        _typeBTickBlock = new TransformBlock<Job, Job>(async job =>
        {
            await job.Tick();
            await job.Tick();
            return job;
        }, _options);

        _writeBlock = new TransformBlock<Job, Job>(job =>
        {
            Console.WriteLine(job.Ticker);
            return job;
        }, _options);

        _finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);

        _intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
        {
            job.IntermediateCleanUp();
            return job;
        }, _options);
    }

    private void ConfigureBlocks()
    {
        _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

        _balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
        _balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);

        _typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
        _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());

        _typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());

        _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
        _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

        _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
    }
}

Это мой конвейер потока данных, представляющий мою "иллюстрацию" выше: D,Все это выполняется в моем планировщике, который запускается в Programm.cs:

public class Scheduler
{
    private readonly Timer _timer;

    private readonly Dataflow _flow;


    public Scheduler(int intervall)
    {
        _timer = new Timer(intervall);
        _flow = new Dataflow();
    }

    public void Start()
    {
        _timer.AutoReset = false;
        _timer.Elapsed += _timer_Elapsed;
        _timer.Start();
    }

    private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        try
        {
            _timer.Stop();
            Console.WriteLine("Timer stopped");
            await _flow.Process().ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            Console.WriteLine("Timer started again.");
            _timer.Start();
        }
    }
}

class Program
{
    static  void Main(string[] args)
    {
        var scheduler = new Scheduler(1000);
        scheduler.Start();

        Console.ReadKey();

    }
}

Вывод на консоль, который я получаю: Таймер остановлен. Подготовка к тикированию. Попытка к фиксации. Попытка завершить тику. Попытка к фиксации.закончить тикирование Попытка зафиксировать Попытка завершить тикирование Попытка завершить Попытка завершить тикирование Попытка завершить Попытка завершить тикирование Попытка завершить тикирование Попытка завершить тикаTicking Попытка фиксации Попытка завершить Попытка фиксации Попытка завершить

Кажется, что программа перестала работать в этот момент, потому что я не достигаю каких-либо точек останова или продолжаю работу.Я думаю, что все мои блоки уже получили сигнал завершения и поэтому перестают принимать любые новые предметы.Поэтому мой вопрос таков: как мне управлять сигналом завершения, чтобы конвейер заканчивался только тогда, когда больше нет работы?

1 Ответ

2 голосов
/ 25 марта 2019

Основной проблемой вашего потока является обратная связь с вашим тиковым блоком. Это вызывает две проблемы.

  • Обратное давление
  • Завершение потока

Первый: обратное давление

Когда _typeATickBlock подключен обратно к себе, он перестанет принимать все сообщения, как только достигнет своей емкости. В вашем случае 4, это означает, что как только у него будет 3 сообщения в выходном буфере и одно обрабатывается, он перестанет принимать и передавать сообщения. Вы можете увидеть это, добавив следующую строку в блок:

Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");

И выведет:

Tick Block 0/3

Чтобы исправить это, вы можете добавить любой блок буферизации, Buffer или Transform. Ключом будет ограниченная емкость буфера. В вашем случае каждое отдельное сообщение необходимо будет перенаправить обратно в тиковый блок. При этом вы знаете, что ваша емкость должна соответствовать объему сообщений в любой момент времени. В этом случае 500.

_printingBuffer = new TransformBlock<Job, Job>(job =>
{
    Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
    return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });

В вашем реальном коде вы можете не знать значение, и Unbounded может быть вашим лучшим вариантом, чтобы избежать блокировки вашего конвейера, но вы можете настроить это значение, учитывая ваш входящий объем.

Второй: поток завершения

С обратной связью в вашем конвейере распространение становится более сложным, чем просто установка параметров связи. Как только завершение попадает в тиковый блок, он прекращает принимать все сообщения, даже те, которые еще необходимо обработать. Чтобы этого избежать, нужно задерживать распространение, пока все сообщения не пройдут цикл. Сначала вы останавливаете распространение непосредственно перед тиковым блоком, а затем проверяете буферы на каждом блоке, который участвует в цикле. Затем, когда все буферы пустые, передайте завершение и ошибку в блок.

_balancerBlock.Completion.ContinueWith(tsk =>
{
    while (!_typeATickBlock.Completion.IsCompleted)
    {
        if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
        && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
        {
            _typeATickBlock.Complete();
        }
    }
});

Последнее

Ваша полная ConfigureBlocks с настройкой завершения и вставленным буфером должна выглядеть примерно так. Заметьте, что я только прошел успешно, а не ошибку, и я удалил ветку типа B.

private void ConfigureBlocks()
{
    _preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

    _balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);

    _balancerBlock.Completion.ContinueWith(tsk =>
    {
        while (!_typeATickBlock.Completion.IsCompleted)
        {
            if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
            && _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
            {
                _typeATickBlock.Complete();
            }
        }
    });

    _typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
    _printingBuffer.LinkTo(_typeATickBlock);
    _typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());            

    _writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
    _writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

    _intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}

Некоторое время назад я написал сообщение в блоге, блог больше не активен, об обработке завершения с помощью циклов обратной связи. Это может оказать дополнительную помощь. Получено с WayBackMachine.

Поиск завершения в сложном потоке: циклы обратной связи

...