ActionBlock.Complete.Wait (токен) не ожидает завершения конвейера - PullRequest
0 голосов
/ 22 мая 2019

У меня есть конвейер потока данных TPL с условным потоком сообщения, который может закончиться в любом из блоков действия, как показано на рисунке. enter image description here

У меня также для PropogateCompletion установлено значение True, это те же параметры ссылок, которые используются во всем конвейере.

Как только я запускаю Процесс, передавая контекст первому TransformBlock, он запускает контекст процесса, содержащий время начала и окончания и некоторые метрики того, сколько записей его обрабатывает.

Трубопроводный метод.

public void  Process(CancellationToken token = default)
        {
            var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
            var exBlockOptions = new ExecutionDataflowBlockOptions
                {CancellationToken = token, MaxDegreeOfParallelism = Environment.ProcessorCount};
            _context.Start();

            var setMessagesToStartStatus = new TransformBlock<IContext, IContext>(async context =>
            {
                await _mRepository.SetMessagesToStartStatus(token);
                return context;
            }, exBlockOptions);
            var autoMapMessageReferences =
                new TransformBlock<IContext, IContext>(async context =>
                    {
                        await _mRepository.SetAutoMapMessageReferences(token);
                        return context;
                    },
                    exBlockOptions);
            var messagesToProcess = new TransformManyBlock<IContext, AwsomeMessage>(
                async context =>
                {
                    var r = await _mRepository.GetPendingMessagesToProcess(token);
                    return r.ToList();
                }, exBlockOptions);
            var errActionBlock =
                new ActionBlock<AwsomeMessage>(async inMessage =>
                {
                    await _mRepository.SetToErrorState(inMessage.MessageId, inMessage.ErrorMessage, token);
                    _errorWorks.Add(inMessage);
                }, exBlockOptions);

            var printActionBlock = new ActionBlock<AwsomeMessage>(inMessage =>
            {

                Console.WriteLine(
                    $"ID :{inMessage.MessageId}, Error?: {inMessage.IsInError}, ErrorString :: {inMessage.ErrorMessage}");
                _processedWorks.Add(inMessage);
            }, exBlockOptions);
            var updateSourceSystem = EnrichSourceSystemTransBlock(token, exBlockOptions);
            var detoxReasonUpdate = EnrichDetoxReasonTransBlock(exBlockOptions);
            var awsomeActionMapping = EnrichAwsomeActionMappingTransBlock(token, exBlockOptions);
            var lateReasonMapToWorkAction = EnrichLateReasonTransBlock(exBlockOptions);
            var updateWithNarrationReason = EnrichReasonToOtherTransBlock(token, exBlockOptions);
            var crossAmendedByAndOtherReason = EnrichCrossSystemTransBlock(exBlockOptions);
            var PureEQDBookReplacement = EnrichPurexBookTransBlock(token, exBlockOptions);
            var bookHierarchyEnrichMessage = EnrichBookHierarchyTransBlock(token, exBlockOptions);

            //pipeline
            setMessagesToStartStatus.LinkTo(autoMapMessageReferences, linkOptions);
            autoMapMessageReferences.LinkTo(messagesToProcess, linkOptions);
            messagesToProcess.LinkTo(updateSourceSystem, linkOptions);
            updateSourceSystem.LinkTo(detoxReasonUpdate, linkOptions);
            detoxReasonUpdate.LinkTo(awsomeActionMapping, linkOptions);
            awsomeActionMapping.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
            awsomeActionMapping.LinkTo(lateReasonMapToWorkAction, linkOptions, x => !x.IsInError);
            lateReasonMapToWorkAction.LinkTo(updateWithNarrationReason, linkOptions);
            updateWithNarrationReason.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
            updateWithNarrationReason.LinkTo(crossAmendedByAndOtherReason, linkOptions, x => !x.IsInError);
            crossAmendedByAndOtherReason.LinkTo(PureEQDBookReplacement, linkOptions);
            PureEQDBookReplacement.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
            PureEQDBookReplacement.LinkTo(bookHierarchyEnrichMessage, linkOptions, x => !x.IsInError);
            bookHierarchyEnrichMessage.LinkTo(errActionBlock, linkOptions, x => x.IsInError);
            bookHierarchyEnrichMessage.LinkTo(printActionBlock, linkOptions, x => !x.IsInError);


            //Create Context 
            setMessagesToStartStatus.Post(_context);

            //Kick off complete. -- Mark the head to say its complete.
            try
            {
                setMessagesToStartStatus.Complete();
                printActionBlock.Completion.Wait(token);
                errActionBlock.Completion.Wait(token);
                _context.Complete();
            }
            catch (AggregateException exception)
            {
                Debug.WriteLine(exception.Message);
                foreach (var exc in exception.InnerExceptions)
                {
                    Debug.WriteLine($"Err Msg = {exc.Message}, Stack Work = {exc.StackTrace} ");
                }

                throw exception.InnerExceptions.Single();
            }
        }

Я попробовал Задачу. Когда-нибудь, как это просто не получается. это просто блокирует. как распространение завершено верно, я попытался setMessagesToStartStatus.Complete.Wait ()

когда я делаю это, как только завершается, истина, он просто не ждет здесь, он говорит, что задание завершено и продолжается.

GetMessages из db - это TransformManyBlock. Когда я передаю тестовые данные только одного элемента, который идет в блок ошибок, он теперь переходит в состояние зависания, так как блок печати даже не запускается. так что полный эффект не повлияет. и другая проблема - это не ожидание.

ИСПРАВЛЕН со следующим кодом.

 var completeFalseLinkOptions = new DataflowLinkOptions(){PropagateCompletion = false};

    //Linking changes.
    .LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
        awsomeActionMapping.LinkTo(lateReasonMapToWorkAction, linkOptions, x => !x.IsInError);
        lateReasonMapToWorkAction.LinkTo(updateWithNarrationReason, linkOptions);
        updateWithNarrationReason.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
        updateWithNarrationReason.LinkTo(crossAmendedByAndOtherReason, linkOptions, x => !x.IsInError);
        crossAmendedByAndOtherReason.LinkTo(PureEQDBookReplacement, linkOptions);
        PureEQDBookReplacement.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
        PureEQDBookReplacement.LinkTo(bookHierarchyEnrichMessage, linkOptions, x => !x.IsInError);
        bookHierarchyEnrichMessage.LinkTo(errActionBlock, completeFalseLinkOptions, x => x.IsInError);
        bookHierarchyEnrichMessage.LinkTo(printActionBlock, linkOptions, x => !x.IsInError);

//and Complete Wait here.

setMessagesToStartStatus.Complete();
printActionBlock.Completion.Wait(token);
//manually complete the error block.
errActionBlock.Complete();
errActionBlock.Completion.Wait(token);
_context.Complete();
...