Я создал нечто похожее на веб-сканер для создания отчета о 1000+ веб-сервисах, которыми я должен управлять.Поэтому я создал TPL Dataflow Pipeline для управления получением и обработкой данных.Конвейер, который я представлял, выглядит примерно так (извините за мои навыки рисования: D):
Я уже создал реализацию, и все работало нормально, пока я не запустил свой конвейер какцелое.Я передал 500 объектов в конвейер в качестве входных данных в конвейер и ожидал, что программа будет работать некоторое время, но программа остановилась после перехода в блок выполнения.После проверки хода выполнения программы мне показалось, что Завершение быстро распространяется к Блоку утилизации.Я создал небольшой пример проекта с тем же конвейером, чтобы проверить, была ли это моя реализация классов ввода или самого конвейера.Пример кода:
public class Job
{
public int Ticker { get; set; }
public Type Type { get; }
public Job(Type type)
{
Type = type;
}
public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}
public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}
public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}
public bool IsFinished()
{
Console.WriteLine("Trying to finish");
return Ticker == 1000000;
}
public void IntermediateCleanUp()
{
Console.WriteLine("intermediate Cleanup");
Ticker = Ticker - 120;
}
public void finalCleanUp()
{
Console.WriteLine("Final Cleanup");
Ticker = -1;
}
}
Это мой класс ввода, который введен в блок подготовки.
public class Dataflow
{
private TransformBlock<Job, Job> _preparationsBlock;
private BufferBlock<Job> _balancerBlock;
private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4
};
private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
private TransformBlock<Job, Job> _typeATickBlock;
private TransformBlock<Job, Job> _typeBTickBlock;
private TransformBlock<Job, Job> _writeBlock;
private TransformBlock<Job, Job> _intermediateCleanupBlock;
private ActionBlock<Job> _finalCleanupBlock;
public async Task Process()
{
CreateBlocks();
ConfigureBlocks();
for (int i = 0; i < 500; i++)
{
await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
}
_preparationsBlock.Complete();
await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
}
private void CreateBlocks()
{
_preparationsBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Prepare();
return job;
}, _options);
_balancerBlock = new BufferBlock<Job>(_options);
_typeATickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
return job;
}, _options);
_typeBTickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
await job.Tick();
return job;
}, _options);
_writeBlock = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine(job.Ticker);
return job;
}, _options);
_finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);
_intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
{
job.IntermediateCleanUp();
return job;
}, _options);
}
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
_balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);
_typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
}
Это мой конвейер потока данных, представляющий мою "иллюстрацию" выше: D,Все это выполняется в моем планировщике, который запускается в Programm.cs:
public class Scheduler
{
private readonly Timer _timer;
private readonly Dataflow _flow;
public Scheduler(int intervall)
{
_timer = new Timer(intervall);
_flow = new Dataflow();
}
public void Start()
{
_timer.AutoReset = false;
_timer.Elapsed += _timer_Elapsed;
_timer.Start();
}
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
_timer.Stop();
Console.WriteLine("Timer stopped");
await _flow.Process().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Console.WriteLine("Timer started again.");
_timer.Start();
}
}
}
class Program
{
static void Main(string[] args)
{
var scheduler = new Scheduler(1000);
scheduler.Start();
Console.ReadKey();
}
}
Вывод на консоль, который я получаю: Таймер остановлен. Подготовка к тикированию. Попытка к фиксации. Попытка завершить тику. Попытка к фиксации.закончить тикирование Попытка зафиксировать Попытка завершить тикирование Попытка завершить Попытка завершить тикирование Попытка завершить Попытка завершить тикирование Попытка завершить тикирование Попытка завершить тикаTicking Попытка фиксации Попытка завершить Попытка фиксации Попытка завершить
Кажется, что программа перестала работать в этот момент, потому что я не достигаю каких-либо точек останова или продолжаю работу.Я думаю, что все мои блоки уже получили сигнал завершения и поэтому перестают принимать любые новые предметы.Поэтому мой вопрос таков: как мне управлять сигналом завершения, чтобы конвейер заканчивался только тогда, когда больше нет работы?