Почему Polly AsyncCircuitBreakerPolicy разрывает связи между блоками потока данных TPL? - PullRequest
1 голос
/ 14 июля 2020

Вот простой пример кода с использованием TransformBlock и ActionBlock. Я использую библиотеку Polly, чтобы помочь с повторной попыткой logi c. Проблема в том, что как только я вручную открываю CircuitBreaker, а затем снова закрываю его, связь между downloadBlock и actionBlock разрывается. Этого не произойдет, если я опущу предикат установки в вызове LinkTo:

private readonly TransformBlock<DataClass, DataClass> downloadBlock;
private readonly ActionBlock<DataClass> actionBlock;
private readonly AsyncCircuitBreakerPolicy circuitBreaker;
private readonly AsyncRetryPolicy retryPolicy;

retryPolicy = Policy.Handle<WebException>().RetryAsync(4);
circuitBreaker = Policy.Handle<WebException>().CircuitBreakerAsync(10, TimeSpan.FromSeconds(10));

downloadBlock = new TransformBlock<DataClass, DataClass>(async (data) =>
{
    var finalPolicy = retryPolicy.WrapAsync(circuitBreaker);
    try
    {
        await finalPolicy.ExecuteAsync(async () =>
        {
             //await DoSomething();
             data.Status = Status.Completed;
        });
    }
    catch (WebException we)
    {
         data.Status = Status.Failed;
         //Do logging
    }
    catch (BrokenCircuitException)
    {
         data.Status = Status.Failed;
         //Do logging
    }
    return data;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });


actionBlock = new ActionBlock<DataClass>((data) =>
{
   //DoSomething(data);
}});
//Here, if I set data => data.Status != Status.Failed and then at some point I manually call
//circuitBreaker.Isolate() and then after some time circuitBreaker.Reset() to manually close the circuit again,
//items newly pushed into the pipeline are being processed in the downloadBlock but the results
//are not propagated to actionBlock even if their status is OK. This does not happen if ommit setting the predicate.

downloadBlock.LinkTo(actionBlock,
                     new DataflowLinkOptions { PropagateCompletion = true }, data => data.Status != Status.Failed);

Я мог бы, конечно, проверить статус в самом actionBlock, но я хотел бы знать, есть ли у кого-нибудь столкнулся с таким поведением и в чем может быть причина?

EDIT: Вот шаги, необходимые для воспроизведения проблемы:

  1. pu sh данные в конвейер, обработать их,
  2. вызвать circuitBreaker.Isolate (),
  3. pu sh новые данные в конвейере, они попадают в BrokenCircuitException и логически не переходят к следующему блоку из-за сказуемого. Это нормально.
  4. вызовите circuitBreaker.Reset (). Теперь все должно работать.
  5. pu sh новые данные в конвейере, они обрабатываются в блоке загрузки, его статус в порядке, но он просто не распространяется на следующий ActionBlock
...