Как остановить количество вакансий, размещенных в конвейере TPL, с помощью Polly - PullRequest
1 голос
/ 13 июля 2020

Я изучаю разные способы реализации конвейера TPL DataFlow. Пожалуйста, следуйте образцу кода со всеми комментариями, чтобы понять мой вопрос. Вот два простых блока:

//DataClass is very simple class with two properties int Id, and enum Status
var downloadBlock = new TransformBlock<DataClass, DataClass>((data) =>
{
    //Here I use Polly library to help with retrying when exception has occured
    //I have chosen to retry only on WebException, because in this case there is 
    //no need to try ObjectNotFoundException
    var policy = Policy.Handle<WebException>().WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(1));
    try
    {
        policy.Execute(() =>
        {
            //ThisMethodMyThrowWebException();
            data.Status = Status.Completed;
        });
    }
    catch (ObjectNotFoundException)
    {
        data.Status = Status.Failed;
        //Notify user that object is not found;
    }
    catch (WebException we)
    {
        data.Status = Status.Failed;
        //I would like to cancel whole batch of jobs that were sent to pipeline.
    }
    return data;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });


var actionBlock = new ActionBlock<DataClass>((data) =>
{
    if (data.Status != Status.Failed)
    {
        //Do something with data
    }
});

downloadBlock.LinkTo(actionBlock);

Допустим, в конвейер помещено большое количество элементов и произошла постоянная ошибка. Пройдет некоторое время, пока все элементы безуспешно выполнят свои попытки. Вместо этого я хотел бы остановить их обработку, когда будет достигнут определенный порог количества повторов. Чтобы достичь этого, я мог бы передать CancellationTokenSource каждому Block, но моя цель - создать конвейер, который будет работать до завершения работы моего приложения. Насколько я знаю, если я отменю Block, мой конвейер исчезнет. Конечно, я мог бы передать токен для делегирования внутри Execute(), но это остановит только внутри методов обработки. Есть идеи, как добиться этого с помощью библиотеки Polly?

1 Ответ

2 голосов
/ 15 июля 2020

С помощью в разделе комментариев я пришел к выводу, что ответ довольно прост - мне просто нужно передать CancellationToken в policy.Execute() метод, например:

CancellationTokenSource cts = new CancellationTokenSource();
policy.Execute(p =>
{
    //ThisMethodMyThrowWebException();
    data.Status = Status.Completed;
}, cts.Token);

РЕДАКТИРОВАТЬ: По запросу других пользователей я обновляю свой ответ полным образцом кода, который пытается ответить на вопросы в разделе комментариев.

Это немного длинновато, но по порядку чтобы справиться с OperationCancelledException, мне нужно было придумать logi c, как различать guish между исключением, которое выбрасывается из метода (и, следовательно, его следует повторить), и тем, которое было выброшено, потому что пользователь решил отменить job.

Я также решил включить дополнительный код с CircuitBreaker, потому что без него этот ответ был бы неполным, на мой взгляд. Итак, здесь мы go!

private TransformBlock<ConstructData, ConstructData> _downloadBlock;
private AsyncCircuitBreakerPolicy _circuitBreaker;
//This flag will indicate to circuitBreaker and catch block
//that OperationCanceledException was induced by user and 
//therefore it should not be looked upon as worthy by circuitBreaker
//to decide when to open circuit.
private bool _downloadCanceledByUser;
private CancellationTokenSource _downloadCts;
private void CreatePipeline()
{
    var failedItemQueue = new Queue<ConstructData>();

    _circuitBreaker
        = Policy.Handle<WebException>()
                .Or<OperationCanceledException>(ex => !_downloadCanceledByUser)
                .CircuitBreakerAsync(5, TimeSpan.FromDays(5),
                 (exception, timespan) => { },
                 () =>
                 {
                     while (failedItemQueue.Count > 0)
                     {
                         _downloadBlock.Post(failedItemQueue.Dequeue());
                     }
                 });

    var retryPolicy = Policy.Handle<WebException>().Or<OperationCanceledException>()
        .WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(1));

    #region DownloadBlock
    _downloadBlock = new TransformBlock<ConstructData, ConstructData>(async (construct) =>
    {
        var downloadPolicy = retryPolicy.WrapAsync(circuitBreaker);
        try
        {
            await downloadPolicy.ExecuteAsync(async fp =>
            {
                //await MethodThatMayThrowWebException(_downloadCts.Token);
                //await MethodThatMayThrowOperationCanceledException(_downloadCts.Token);
                construct.Status = DownloadFileStatus.Downloaded;
            }, _downloadCts.Token);
        }

        catch (WebException ex)
        {
            construct.Status = DownloadFileStatus.Failed;
            //Here after 2 failed retries guaranteed by retryPolicy,
            //I repost failed item back to the queue. 
            _downloadBlock.Post(construct);
        }
        catch (OperationCanceledException)
        {
            construct.Status = DownloadFileStatus.Canceled;
            //In case if OperationCanceledException was thrown from 
            //MethodThatMayThrowOperationCanceledException()
            //item gets posted back to downloadBlock for retry.
            if (!_downloadCanceledByUser)
                _downloadBlock.Post(construct);
        }
        //When total exception count reaches circuitBreaker threshold, circuit is left in open state
        catch (BrokenCircuitException ex)
        {
            //TODO Notify user that he needs to check internet connection and press reload.
            construct.Status = DownloadFileStatus.Failed;
            failedItemQueue.Enqueue(construct);
        }
        return construct;
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    #endregion DownloadBlock

    #region ProcessBlock

    var processBlock = new ActionBlock<ConstructData>(construct =>
    {
        if (construct.Status == DownloadFileStatus.Downloaded)
        {
            var fullName = string.Concat(construct.Path, construct.Name);
            try
            {
                //ProcessingMethod(_downloadCts.Token);
            }
            catch (OperationCanceledException ex)
            {
                //TODO Do logging if needed.
            }
        }
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
    #endregion

    _downloadBlock.LinkTo(processBlock);
}

public void PushDataIntoPipeline(List<ConstructData> data)
{
    //Reset properties if previous batch was canceled
    if (_downloadCanceledByUser)
    {
        _downloadCts = new CancellationTokenSource();
        _downloadCanceledByUser = false;
    }
    //Push data into pipeline
    _ = data.Select(downloadBlock.SendAsync).ToList();
}
public void CancelData()
{
    _downloadCanceledByUser = true;
    _downloadCts.Cancel();
}
public void Reload()
{
    _circuitBreaker.Reset();
}
...