Эта проблема возникла при попытке реализовать предложенное решение этой проблемы .
Краткое описание проблемы
Выполнение вызова ReceiveAsync () из TransformBlock в WriteOnceBlock приводит к тому, что TransformBlock по существу удаляет себя из потока. Он перестает распространять любые сообщения, будь то данные или сигнал завершения.
Системный дизайн
Система предназначена для анализа больших файлов CSV через серию шагов.
Проблемную часть потока можно (неумело) визуализировать следующим образом:
Параллелограмм - это BufferBlock, ромбы - это BroadcastBlocks, треугольники - это WriteOnceBlocks, а стрелки - это TransformBlocks. Сплошные линии обозначают ссылку, созданную с помощью LinkTo (), а пунктирная линия представляет вызов ReceiveAsync () из ParsedHeaderAndRecordJoiner в блок ParsedHeaderContainer. Я знаю, что этот поток несколько неоптимален, но это не основная причина вопроса.
Код
Корень приложения
Вот часть класса, которая создает необходимые блоки и связывает их вместе с помощью PropagateCompletion
using (var cancellationSource = new CancellationTokenSource())
{
var cancellationToken = cancellationSource.Token;
var temporaryEntityInstance = new Card(); // Just as an example
var producerQueue = queueFactory.CreateQueue<string>(new DataflowBlockOptions{CancellationToken = cancellationToken});
var recordDistributor = distributorFactory.CreateDistributor<string>(s => (string)s.Clone(),
new DataflowBlockOptions { CancellationToken = cancellationToken });
var headerRowContainer = containerFactory.CreateContainer<string>(s => (string)s.Clone(),
new DataflowBlockOptions { CancellationToken = cancellationToken });
var headerRowParser = new HeaderRowParserFactory().CreateHeaderRowParser(temporaryEntityInstance.GetType(), ';',
new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var parsedHeaderContainer = containerFactory.CreateContainer<HeaderParsingResult>(HeaderParsingResult.Clone,
new DataflowBlockOptions { CancellationToken = cancellationToken});
var parsedHeaderAndRecordJoiner = new ParsedHeaderAndRecordJoinerFactory().CreateParsedHeaderAndRecordJoiner(parsedHeaderContainer,
new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var entityParser = new entityParserFactory().CreateEntityParser(temporaryEntityInstance.GetType(), ';',
dataflowBlockOptions: new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var entityDistributor = distributorFactory.CreateDistributor<EntityParsingResult>(EntityParsingResult.Clone,
new DataflowBlockOptions{CancellationToken = cancellationToken});
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
// Producer subprocess
producerQueue.LinkTo(recordDistributor, linkOptions);
// Header subprocess
recordDistributor.LinkTo(headerRowContainer, linkOptions);
headerRowContainer.LinkTo(headerRowParser, linkOptions);
headerRowParser.LinkTo(parsedHeaderContainer, linkOptions);
parsedHeaderContainer.LinkTo(errorQueue, new DataflowLinkOptions{MaxMessages = 1, PropagateCompletion = true}, dataflowResult => !dataflowResult.WasSuccessful);
// Parsing subprocess
recordDistributor.LinkTo(parsedHeaderAndRecordJoiner, linkOptions);
parsedHeaderAndRecordJoiner.LinkTo(entityParser, linkOptions, joiningResult => joiningResult.WasSuccessful);
entityParser.LinkTo(entityDistributor, linkOptions);
entityDistributor.LinkTo(errorQueue, linkOptions, dataflowResult => !dataflowResult.WasSuccessful);
}
HeaderRowParser
Этот блок анализирует строку заголовка из файла CSV и выполняет некоторую проверку.
public class HeaderRowParserFactory
{
public TransformBlock<string, HeaderParsingResult> CreateHeaderRowParser(Type entityType,
char delimiter,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return new TransformBlock<string, HeaderParsingResult>(headerRow =>
{
// Set up some containers
var result = new HeaderParsingResult(identifier: "N/A", wasSuccessful: true);
var fieldIndexesByPropertyName = new Dictionary<string, int>();
// Get all serializable properties on the chosen entity type
var serializableProperties = entityType.GetProperties()
.Where(prop => prop.IsDefined(typeof(CsvFieldNameAttribute), false))
.ToList();
// Add their CSV fieldnames to the result
var entityFieldNames = serializableProperties.Select(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName);
result.SetEntityFieldNames(entityFieldNames);
// Create the dictionary of properties by field name
var serializablePropertiesByFieldName = serializableProperties.ToDictionary(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName, prop => prop, StringComparer.OrdinalIgnoreCase);
var fields = headerRow.Split(delimiter);
for (var i = 0; i < fields.Length; i++)
{
// If any field in the CSV is unknown as a serializable property, we return a failed result
if (!serializablePropertiesByFieldName.TryGetValue(fields[i], out var foundProperty))
{
result.Invalidate($"The header row contains a field that does not match any of the serializable properties - {fields[i]}.",
DataflowErrorSeverity.Critical);
return result;
}
// Perform a bunch more validation
fieldIndexesByPropertyName.Add(foundProperty.Name, i);
}
result.SetFieldIndexesByName(fieldIndexesByPropertyName);
return result;
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
}
}
ParsedHeaderAndRecordJoiner
Для каждой последующей записи, проходящей через канал, этот блок предназначен для извлечения проанализированных данных заголовка и добавления их в запись.
public class ParsedHeaderAndRecordJoinerFactory
{
public TransformBlock<string, HeaderAndRecordJoiningResult> CreateParsedHeaderAndRecordJoiner(WriteOnceBlock<HeaderParsingResult> parsedHeaderContainer,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return new TransformBlock<string, HeaderAndRecordJoiningResult>(async csvRecord =>
{
var headerParsingResult = await parsedHeaderContainer.ReceiveAsync();
// If the header couldn't be parsed, a critical error is already on its way to the failure logger so we don't need to continue
if (!headerParsingResult.WasSuccessful) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);
// The entity parser can't do anything with the header record, so we send a message with wasSuccessful false
var isHeaderRecord = true;
foreach (var entityFieldName in headerParsingResult.EntityFieldNames)
{
isHeaderRecord &= csvRecord.Contains(entityFieldName);
}
if (isHeaderRecord) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);
return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: true, headerParsingResult, csvRecord);
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
}
}
Деталь проблемы
В текущей реализации ParsedHeaderAndRecordJoiner правильно получает данные из вызова ReceiveAsync () в ParsedHeaderContainer и возвращает ожидаемый результат, однако сообщение не поступает в EntityParser.
Кроме того, когда сигнал Complete отправляется в начало потока (ProducerQueue), он распространяется на RecordDistributor, но затем останавливается в ParsedHeaderAndRecordJoiner (он продолжает с HeaderRowContainer и далее, поэтому RecordDistributor передает его дальше) ,
Если я удаляю вызов ReceiveAsync () и сам макетирую данные, блок ведет себя как ожидалось.