Вот простой пример кода с использованием TransformBlock
и ActionBlock
. Я использую библиотеку Polly, чтобы помочь с повторной попыткой logi c. Проблема в том, что как только я вручную открываю CircuitBreaker
, а затем снова закрываю его, связь между downloadBlock
и actionBlock
разрывается. Этого не произойдет, если я опущу предикат установки в вызове LinkTo
:
private readonly TransformBlock<DataClass, DataClass> downloadBlock;
private readonly ActionBlock<DataClass> actionBlock;
private readonly AsyncCircuitBreakerPolicy circuitBreaker;
private readonly AsyncRetryPolicy retryPolicy;
retryPolicy = Policy.Handle<WebException>().RetryAsync(4);
circuitBreaker = Policy.Handle<WebException>().CircuitBreakerAsync(10, TimeSpan.FromSeconds(10));
downloadBlock = new TransformBlock<DataClass, DataClass>(async (data) =>
{
var finalPolicy = retryPolicy.WrapAsync(circuitBreaker);
try
{
await finalPolicy.ExecuteAsync(async () =>
{
//await DoSomething();
data.Status = Status.Completed;
});
}
catch (WebException we)
{
data.Status = Status.Failed;
//Do logging
}
catch (BrokenCircuitException)
{
data.Status = Status.Failed;
//Do logging
}
return data;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
actionBlock = new ActionBlock<DataClass>((data) =>
{
//DoSomething(data);
}});
//Here, if I set data => data.Status != Status.Failed and then at some point I manually call
//circuitBreaker.Isolate() and then after some time circuitBreaker.Reset() to manually close the circuit again,
//items newly pushed into the pipeline are being processed in the downloadBlock but the results
//are not propagated to actionBlock even if their status is OK. This does not happen if ommit setting the predicate.
downloadBlock.LinkTo(actionBlock,
new DataflowLinkOptions { PropagateCompletion = true }, data => data.Status != Status.Failed);
Я мог бы, конечно, проверить статус в самом actionBlock
, но я хотел бы знать, есть ли у кого-нибудь столкнулся с таким поведением и в чем может быть причина?
EDIT: Вот шаги, необходимые для воспроизведения проблемы:
- pu sh данные в конвейер, обработать их,
- вызвать circuitBreaker.Isolate (),
- pu sh новые данные в конвейере, они попадают в BrokenCircuitException и логически не переходят к следующему блоку из-за сказуемого. Это нормально.
- вызовите circuitBreaker.Reset (). Теперь все должно работать.
- pu sh новые данные в конвейере, они обрабатываются в блоке загрузки, его статус в порядке, но он просто не распространяется на следующий ActionBlock