С помощью в разделе комментариев я пришел к выводу, что ответ довольно прост - мне просто нужно передать CancellationToken
в policy.Execute()
метод, например:
CancellationTokenSource cts = new CancellationTokenSource();
policy.Execute(p =>
{
//ThisMethodMyThrowWebException();
data.Status = Status.Completed;
}, cts.Token);
РЕДАКТИРОВАТЬ: По запросу других пользователей я обновляю свой ответ полным образцом кода, который пытается ответить на вопросы в разделе комментариев.
Это немного длинновато, но по порядку чтобы справиться с OperationCancelledException
, мне нужно было придумать logi c, как различать guish между исключением, которое выбрасывается из метода (и, следовательно, его следует повторить), и тем, которое было выброшено, потому что пользователь решил отменить job.
Я также решил включить дополнительный код с CircuitBreaker
, потому что без него этот ответ был бы неполным, на мой взгляд. Итак, здесь мы go!
private TransformBlock<ConstructData, ConstructData> _downloadBlock;
private AsyncCircuitBreakerPolicy _circuitBreaker;
//This flag will indicate to circuitBreaker and catch block
//that OperationCanceledException was induced by user and
//therefore it should not be looked upon as worthy by circuitBreaker
//to decide when to open circuit.
private bool _downloadCanceledByUser;
private CancellationTokenSource _downloadCts;
private void CreatePipeline()
{
var failedItemQueue = new Queue<ConstructData>();
_circuitBreaker
= Policy.Handle<WebException>()
.Or<OperationCanceledException>(ex => !_downloadCanceledByUser)
.CircuitBreakerAsync(5, TimeSpan.FromDays(5),
(exception, timespan) => { },
() =>
{
while (failedItemQueue.Count > 0)
{
_downloadBlock.Post(failedItemQueue.Dequeue());
}
});
var retryPolicy = Policy.Handle<WebException>().Or<OperationCanceledException>()
.WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(1));
#region DownloadBlock
_downloadBlock = new TransformBlock<ConstructData, ConstructData>(async (construct) =>
{
var downloadPolicy = retryPolicy.WrapAsync(circuitBreaker);
try
{
await downloadPolicy.ExecuteAsync(async fp =>
{
//await MethodThatMayThrowWebException(_downloadCts.Token);
//await MethodThatMayThrowOperationCanceledException(_downloadCts.Token);
construct.Status = DownloadFileStatus.Downloaded;
}, _downloadCts.Token);
}
catch (WebException ex)
{
construct.Status = DownloadFileStatus.Failed;
//Here after 2 failed retries guaranteed by retryPolicy,
//I repost failed item back to the queue.
_downloadBlock.Post(construct);
}
catch (OperationCanceledException)
{
construct.Status = DownloadFileStatus.Canceled;
//In case if OperationCanceledException was thrown from
//MethodThatMayThrowOperationCanceledException()
//item gets posted back to downloadBlock for retry.
if (!_downloadCanceledByUser)
_downloadBlock.Post(construct);
}
//When total exception count reaches circuitBreaker threshold, circuit is left in open state
catch (BrokenCircuitException ex)
{
//TODO Notify user that he needs to check internet connection and press reload.
construct.Status = DownloadFileStatus.Failed;
failedItemQueue.Enqueue(construct);
}
return construct;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
#endregion DownloadBlock
#region ProcessBlock
var processBlock = new ActionBlock<ConstructData>(construct =>
{
if (construct.Status == DownloadFileStatus.Downloaded)
{
var fullName = string.Concat(construct.Path, construct.Name);
try
{
//ProcessingMethod(_downloadCts.Token);
}
catch (OperationCanceledException ex)
{
//TODO Do logging if needed.
}
}
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
#endregion
_downloadBlock.LinkTo(processBlock);
}
public void PushDataIntoPipeline(List<ConstructData> data)
{
//Reset properties if previous batch was canceled
if (_downloadCanceledByUser)
{
_downloadCts = new CancellationTokenSource();
_downloadCanceledByUser = false;
}
//Push data into pipeline
_ = data.Select(downloadBlock.SendAsync).ToList();
}
public void CancelData()
{
_downloadCanceledByUser = true;
_downloadCts.Cancel();
}
public void Reload()
{
_circuitBreaker.Reset();
}