Эту проблему действительно сложно отладить, она не всегда возникает (не происходит за короткое время, чтобы я мог легко отлаживать код) и похоже, что ни у кого не было подобной проблемы, как эта? (Я часами гуглил, но не нашел ничего, связанного с этой проблемой).
Короче говоря, моя сеть потока данных в какой-то момент работает нормально, пока я не обнаружу, что клеммный блок (который обновляет пользовательский интерфейс) кажется чтобы перестать работать (новые данные не обновляются в пользовательском интерфейсе), тогда как все блоки восходящего потока данных по-прежнему работают нормально. Это похоже на некоторую разобщенность между другими блоками и блоком ui.
Вот моя подробная сеть потоков данных, давайте сначала проверим, прежде чем я собираюсь подробнее объяснить проблему:
//the network graph first
[raw data block]
-> [switching block] -> [data counting block]
-> [processing block] -> [ok result block] -> [completion monitoring]
-> [not ok result block] -> [completion monitoring]
//in the UI code behind where I can consume the network and plug-in some other blocks for updating
//like this:
[ok result block] -> [ok result counting block]
[not ok result block] -> [other ui updating]
Блок [ok result block]
- это BroadcastBlock
, который помещает результат в [ok result counting block]
. Проблема, которую я частично описал здесь, заключается в том, что [ok result counting block]
, похоже, отключен от [ok result block]
.
var options = new DataflowBlockOptions { EnsureOrdered = false };
var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 80 };
//[raw data block]
var rawDataBlock = new BufferBlock<Input>(options);
//[switching block]
var switchingBlock = new TransformManyBlock<Input,Input>(e => new[] {e,null});
//[data counting block]
var dataCountingBlock = new BroadcastBlock<Input>(null);
//[processing block]
var processingBlock = new TransformBlock<Input,int>(async e => {
//call another api to compute the result
var result = await …;
//rollback the input for later processing (some kind of retry)
if(result < 0){
//per my logging, there is only one call dropping
//in this case
Task.Run(rollback);
}
//local function to rollback
async Task rollback(){
await rawDataBlock.SendAsync(e).ConfigureAwait(false);
}
return result;
}, execOptions);
//[ok result block]
var okResultBlock = new BroadcastBlock<int>(null, options);
//[not ok result block]
var notOkResultBlock = new BroadcastBlock<int>(null, options);
//[completion monitoring]
var completionMonitoringBlock = new ActionBlock<int>(e => {
if(rawDataBlock.Completion.IsCompleted && processingBlock.InputCount == 0){
processingBlock.Complete();
}
}, execOptions);
//connect the blocks to build the network
rawDataBlock.LinkTo(switchingBlock);
switchingBlock.LinkTo(processingBlock, e => e != null);
switchingBlock.LinkTo(dataCountingBlock, e => e == null);
processingBlock.LinkTo(okResultBlock, e => e >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e < 9);
okResultBlock.LinkTo(completionMonitoringBlock);
notOkResultBlock.LinkTo(completionMonitoringBlock);
В коде пользовательского интерфейса я подключаю некоторые другие блоки пользовательского интерфейса для обновления информации. Здесь я использую WPF
, но я думаю, что здесь это не имеет значения:
var uiBlockOptions = new ExecutionDataflowBlockOptions {
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
};
dataCountingBlock.LinkTo(new ActionBlock<int>(e => {
//these are properties in the VM class, which is bound to the UI (xaml view)
RawInputCount++;
}, uiBlockOptions));
okResultBlock.LinkTo(new ActionBlock<int>(e => {
//these are properties in the VM class, which is bound to the UI (xaml view)
ProcessedCount++;
OkResultCount++;
}, uiBlockOptions));
notOkResultBlock.LinkTo(new ActionBlock<int>(e => {
//these are properties in the VM class, which is bound to the UI (xaml view)
ProcessedCount++;
PendingCount = processingBlock.InputCount;
}, uiBlockOptions));
У меня есть код, отслеживающий статус завершения блоков: rawDataBlock
, processingBlock
, okResultBlock
, notOkResultBlock
. У меня также есть другой код регистрации внутри processingBlock
, чтобы помочь диагностировать.
Итак, как я уже сказал, через довольно долгое время (около 1 часа с обработкой около 600K элементов, на самом деле это число ничего не говорит о проблеме, это может быть случайным образом) сеть, кажется, все еще работает нормально, за исключением того, что некоторые счетчики (нормальный результат, не нормальный результат) не обновляются, как если бы okResultBlock
и notOkResultBlock
были отключены от processingBlock
ИЛИ они были отключены из блоков пользовательского интерфейса (который обновляет пользовательский интерфейс). Я гарантирую, что processingBlock
все еще работает (исключение не регистрируется и результаты все еще записываются в файл), dataCountingBlock
все еще работает хорошо (с новым счетчиком, обновленным в пользовательском интерфейсе), все блоки processingBlock
, okResultBlock
, notOkResultBlock
не завершены (их завершение - это .ContinueWith
задача, которая выдает статус и ничего не регистрируется).
Так что это действительно застряло. Я понятия не имею, почему это могло перестать так работать. Это могло произойти только при использовании библиотеки черного ящика, например TPL Dataflow
. Я знаю, что вам также может быть трудно диагностировать, вообразить и подумать о возможностях. Я просто попросил здесь предложения по решению этой проблемы, а также ваш общий опыт (о подобных проблемах) и, возможно, некоторые предположения о том, что может вызвать такого рода проблему в TPL Dataflow
ОБНОВЛЕНИЕ :
Я успешно воспроизвел ошибку еще раз, прежде чем я подготовил код для записи некоторой информации, которая поможет отладке. Проблема теперь сводится к этому моменту: processingBlock
каким-то образом фактически не отправляет / отправляет / не отправляет сообщения всем связанным блокам (включая okResultBlock
и notOkResultBlock
) И даже новый блок (с добавлением * 1049 в начале * имеющий Append
false), связанный с ним, не мог получить никакого сообщения (результат). Как я уже сказал, processBlock
, похоже, все еще работает нормально (его Action
запускает код внутри и обычно производит регистрацию результатов). Так что это все еще очень странная проблема.
Короче говоря, проблема теперь в том, почему processBlock
не может отправлять / публиковать свои сообщения во все другие связанные блоки? Есть ли какая-то возможная причина для этого? Как узнать, успешно ли связаны блоки (после звонка на .LinkTo
)?