У меня есть конвейер потока данных TPL с условным потоком сообщения, который может закончиться в любом из блоков действия, как показано на рисунке.
![enter image description here](https://i.stack.imgur.com/w20py.png)
У меня также для PropogateCompletion установлено значение True, это те же параметры ссылок, которые используются во всем конвейере.
Как только я запускаю Процесс, передавая контекст первому TransformBlock, он запускает контекст процесса, содержащий время начала и окончания и некоторые метрики того, сколько записей его обрабатывает.
Трубопроводный метод.
public void Process(CancellationToken token = default)
{
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
var exBlockOptions = new ExecutionDataflowBlockOptions
{CancellationToken = token, MaxDegreeOfParallelism = Environment.ProcessorCount};
_context.Start();
var setMessagesToStartStatus = new TransformBlock<IContext, IContext>(async context =>
{
await _mRepository.SetMessagesToStartStatus(token);
return context;
}, exBlockOptions);
var autoMapMessageReferences =
new TransformBlock<IContext, IContext>(async context =>
{
await _mRepository.SetAutoMapMessageReferences(token);
return context;
},
exBlockOptions);
var messagesToProcess = new TransformManyBlock<IContext, AwsomeMessage>(
async context =>
{
var r = await _mRepository.GetPendingMessagesToProcess(token);
return r.ToList();
}, exBlockOptions);
var errActionBlock =
new ActionBlock<AwsomeMessage>(async inMessage =>
{
await _mRepository.SetToErrorState(inMessage.MessageId, inMessage.ErrorMessage, token);
_errorWorks.Add(inMessage);
}, exBlockOptions);
var printActionBlock = new ActionBlock<AwsomeMessage>(inMessage =>
{
Console.WriteLine(
$"ID :{inMessage.MessageId}, Error?: {inMessage.IsInError}, ErrorString :: {inMessage.ErrorMessage}");
_processedWorks.Add(inMessage);
}, exBlockOptions);
var updateSourceSystem = EnrichSourceSystemTransBlock(token, exBlockOptions);
var detoxReasonUpdate = EnrichDetoxReasonTransBlock(exBlockOptions);
var awsomeActionMapping = EnrichAwsomeActionMappingTransBlock(token, exBlockOptions);
var lateReasonMapToWorkAction = EnrichLateReasonTransBlock(exBlockOptions);
var updateWithNarrationReason = EnrichReasonToOtherTransBlock(token, exBlockOptions);
var crossAmendedByAndOtherReason = EnrichCrossSystemTransBlock(exBlockOptions);
var PureEQDBookReplacement = EnrichPurexBookTransBlock(token, exBlockOptions);
var bookHierarchyEnrichMessage = EnrichBookHierarchyTransBlock(token, exBlockOptions);
//pipeline
setMessagesToStartStatus.LinkTo(autoMapMessageReferences, linkOptions);
autoMapMessageReferences.LinkTo(messagesToProcess, linkOptions);
messagesToProcess.LinkTo(updateSourceSystem, linkOptions);
updateSourceSystem.LinkTo(detoxReasonUpdate, linkOptions);
detoxReasonUpdate.LinkTo(awsomeActionMapping, linkOptions);
awsomeActionMapping.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
awsomeActionMapping.LinkTo(lateReasonMapToWorkAction, linkOptions, x => !x.IsInError);
lateReasonMapToWorkAction.LinkTo(updateWithNarrationReason, linkOptions);
updateWithNarrationReason.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
updateWithNarrationReason.LinkTo(crossAmendedByAndOtherReason, linkOptions, x => !x.IsInError);
crossAmendedByAndOtherReason.LinkTo(PureEQDBookReplacement, linkOptions);
PureEQDBookReplacement.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
PureEQDBookReplacement.LinkTo(bookHierarchyEnrichMessage, linkOptions, x => !x.IsInError);
bookHierarchyEnrichMessage.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
bookHierarchyEnrichMessage.LinkTo(printActionBlock, linkOptions, x => !x.IsInError);
//Create Context
setMessagesToStartStatus.Post(_context);
//Kick off complete. -- Mark the head to say its complete.
try
{
setMessagesToStartStatus.Complete();
printActionBlock.Completion.Wait(token);
errActionBlock.Completion.Wait(token);
_context.Complete();
}
catch (AggregateException exception)
{
Debug.WriteLine(exception.Message);
foreach (var exc in exception.InnerExceptions)
{
Debug.WriteLine($"Err Msg = {exc.Message}, Stack Work = {exc.StackTrace} ");
}
throw exception.InnerExceptions.Single();
}
}
Я попробовал Задачу. Когда-нибудь, как это просто не получается. это просто блокирует.
как распространение завершено верно, я попытался setMessagesToStartStatus.Complete.Wait ()
когда я делаю это, как только завершается, истина, он просто не ждет здесь, он говорит, что задание завершено и продолжается.
GetMessages из db - это TransformManyBlock. Когда я передаю тестовые данные только одного элемента, который идет в блок ошибок, он теперь переходит в состояние зависания, так как блок печати даже не запускается. так что полный эффект не повлияет. и другая проблема - это не ожидание.
ИСПРАВЛЕН со следующим кодом.
var completeFalseLinkOptions = new DataflowLinkOptions(){PropagateCompletion = false};
//Linking changes.
.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
awsomeActionMapping.LinkTo(lateReasonMapToWorkAction, linkOptions, x => !x.IsInError);
lateReasonMapToWorkAction.LinkTo(updateWithNarrationReason, linkOptions);
updateWithNarrationReason.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
updateWithNarrationReason.LinkTo(crossAmendedByAndOtherReason, linkOptions, x => !x.IsInError);
crossAmendedByAndOtherReason.LinkTo(PureEQDBookReplacement, linkOptions);
PureEQDBookReplacement.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
PureEQDBookReplacement.LinkTo(bookHierarchyEnrichMessage, linkOptions, x => !x.IsInError);
bookHierarchyEnrichMessage.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
bookHierarchyEnrichMessage.LinkTo(printActionBlock, linkOptions, x => !x.IsInError);
//and Complete Wait here.
setMessagesToStartStatus.Complete();
printActionBlock.Completion.Wait(token);
//manually complete the error block.
errActionBlock.Complete();
errActionBlock.Completion.Wait(token);
_context.Complete();