ReceiveAsync прерывает / прерывает передачу сообщений - PullRequest
0 голосов
/ 07 сентября 2018

Эта проблема возникла при попытке реализовать предложенное решение этой проблемы .

Краткое описание проблемы

Выполнение вызова ReceiveAsync () из TransformBlock в WriteOnceBlock приводит к тому, что TransformBlock по существу удаляет себя из потока. Он перестает распространять любые сообщения, будь то данные или сигнал завершения.

Системный дизайн

Система предназначена для анализа больших файлов CSV через серию шагов.

Проблемную часть потока можно (неумело) визуализировать следующим образом:

Partial data flow

Параллелограмм - это BufferBlock, ромбы - это BroadcastBlocks, треугольники - это WriteOnceBlocks, а стрелки - это TransformBlocks. Сплошные линии обозначают ссылку, созданную с помощью LinkTo (), а пунктирная линия представляет вызов ReceiveAsync () из ParsedHeaderAndRecordJoiner в блок ParsedHeaderContainer. Я знаю, что этот поток несколько неоптимален, но это не основная причина вопроса.

Код

Корень приложения

Вот часть класса, которая создает необходимые блоки и связывает их вместе с помощью PropagateCompletion

using (var cancellationSource = new CancellationTokenSource())
{
    var cancellationToken = cancellationSource.Token;
    var temporaryEntityInstance = new Card(); // Just as an example

    var producerQueue = queueFactory.CreateQueue<string>(new DataflowBlockOptions{CancellationToken = cancellationToken});
    var recordDistributor = distributorFactory.CreateDistributor<string>(s => (string)s.Clone(), 
        new DataflowBlockOptions { CancellationToken = cancellationToken });
    var headerRowContainer = containerFactory.CreateContainer<string>(s => (string)s.Clone(), 
        new DataflowBlockOptions { CancellationToken = cancellationToken });
    var headerRowParser = new HeaderRowParserFactory().CreateHeaderRowParser(temporaryEntityInstance.GetType(), ';', 
        new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
    var parsedHeaderContainer = containerFactory.CreateContainer<HeaderParsingResult>(HeaderParsingResult.Clone, 
        new DataflowBlockOptions { CancellationToken = cancellationToken});
    var parsedHeaderAndRecordJoiner = new ParsedHeaderAndRecordJoinerFactory().CreateParsedHeaderAndRecordJoiner(parsedHeaderContainer, 
        new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
    var entityParser = new entityParserFactory().CreateEntityParser(temporaryEntityInstance.GetType(), ';',
        dataflowBlockOptions: new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
    var entityDistributor = distributorFactory.CreateDistributor<EntityParsingResult>(EntityParsingResult.Clone, 
        new DataflowBlockOptions{CancellationToken = cancellationToken});

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};

    // Producer subprocess
    producerQueue.LinkTo(recordDistributor, linkOptions);

    // Header subprocess
    recordDistributor.LinkTo(headerRowContainer, linkOptions);
    headerRowContainer.LinkTo(headerRowParser, linkOptions);
    headerRowParser.LinkTo(parsedHeaderContainer, linkOptions);
    parsedHeaderContainer.LinkTo(errorQueue, new DataflowLinkOptions{MaxMessages = 1, PropagateCompletion = true}, dataflowResult => !dataflowResult.WasSuccessful);

    // Parsing subprocess
    recordDistributor.LinkTo(parsedHeaderAndRecordJoiner, linkOptions);
    parsedHeaderAndRecordJoiner.LinkTo(entityParser, linkOptions, joiningResult => joiningResult.WasSuccessful);
    entityParser.LinkTo(entityDistributor, linkOptions);
    entityDistributor.LinkTo(errorQueue, linkOptions, dataflowResult => !dataflowResult.WasSuccessful);
}

HeaderRowParser

Этот блок анализирует строку заголовка из файла CSV и выполняет некоторую проверку.

public class HeaderRowParserFactory
{
    public TransformBlock<string, HeaderParsingResult> CreateHeaderRowParser(Type entityType,
        char delimiter,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        return new TransformBlock<string, HeaderParsingResult>(headerRow =>
        {
            // Set up some containers
            var result = new HeaderParsingResult(identifier: "N/A", wasSuccessful: true);
            var fieldIndexesByPropertyName = new Dictionary<string, int>();

            // Get all serializable properties on the chosen entity type
            var serializableProperties = entityType.GetProperties()
                .Where(prop => prop.IsDefined(typeof(CsvFieldNameAttribute), false))
                .ToList();

            // Add their CSV fieldnames to the result
            var entityFieldNames = serializableProperties.Select(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName);
            result.SetEntityFieldNames(entityFieldNames);

            // Create the dictionary of properties by field name
            var serializablePropertiesByFieldName = serializableProperties.ToDictionary(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName, prop => prop, StringComparer.OrdinalIgnoreCase);

            var fields = headerRow.Split(delimiter);

            for (var i = 0; i < fields.Length; i++)
            {
                // If any field in the CSV is unknown as a serializable property, we return a failed result
                if (!serializablePropertiesByFieldName.TryGetValue(fields[i], out var foundProperty))
                {
                    result.Invalidate($"The header row contains a field that does not match any of the serializable properties - {fields[i]}.",
                        DataflowErrorSeverity.Critical);
                    return result;
                }

                // Perform a bunch more validation

                fieldIndexesByPropertyName.Add(foundProperty.Name, i);
            }

            result.SetFieldIndexesByName(fieldIndexesByPropertyName);
            return result;
        }, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
    }
}

ParsedHeaderAndRecordJoiner

Для каждой последующей записи, проходящей через канал, этот блок предназначен для извлечения проанализированных данных заголовка и добавления их в запись.

public class ParsedHeaderAndRecordJoinerFactory
{
    public TransformBlock<string, HeaderAndRecordJoiningResult> CreateParsedHeaderAndRecordJoiner(WriteOnceBlock<HeaderParsingResult> parsedHeaderContainer, 
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
    {
        return new TransformBlock<string, HeaderAndRecordJoiningResult>(async csvRecord =>
            {
                var headerParsingResult = await parsedHeaderContainer.ReceiveAsync();

                // If the header couldn't be parsed, a critical error is already on its way to the failure logger so we don't need to continue
                if (!headerParsingResult.WasSuccessful) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);

                // The entity parser can't do anything with the header record, so we send a message with wasSuccessful false
                var isHeaderRecord = true;
                foreach (var entityFieldName in headerParsingResult.EntityFieldNames)
                {
                    isHeaderRecord &= csvRecord.Contains(entityFieldName);
                }
                if (isHeaderRecord) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);

                return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: true, headerParsingResult, csvRecord);
            }, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
    }
}

Деталь проблемы

В текущей реализации ParsedHeaderAndRecordJoiner правильно получает данные из вызова ReceiveAsync () в ParsedHeaderContainer и возвращает ожидаемый результат, однако сообщение не поступает в EntityParser.

Кроме того, когда сигнал Complete отправляется в начало потока (ProducerQueue), он распространяется на RecordDistributor, но затем останавливается в ParsedHeaderAndRecordJoiner (он продолжает с HeaderRowContainer и далее, поэтому RecordDistributor передает его дальше) ,

Если я удаляю вызов ReceiveAsync () и сам макетирую данные, блок ведет себя как ожидалось.

1 Ответ

0 голосов
/ 07 сентября 2018

Я думаю, что эта часть является ключевой

, однако сообщение не поступает в EntityParser.

На основании примера единственный способ EntityParser не получитьсообщение ParsedHeaderAndRecordJoiner выводится, когда WasSuccessful возвращает false.Предикат, используемый в вашей ссылке, исключает ошибочные сообщения, но эти сообщения не имеют куда идти, поэтому они накапливаются в выходном буфере ParsedHeaderAndRecordJoiner и также предотвращают распространение Completion.Вам нужно связать нулевую цель, чтобы вывести сбойные сообщения.

parsedHeaderAndRecordJoiner.LinkTo(DataflowBlock.NullTarget<HeaderParsingResult>());

Далее, если ваши ложные данные всегда возвращаются с WasSuccessful true, тогда это может указывать на await ...ReceiveAsync()

Не обязательно курящий пистолет, но хорошее место для старта.Можете ли вы подтвердить состояние всех сообщений в выходном буфере ParsedHeaderAndRecordJoiner, когда конвейер застревает.

...