Конвейеры обработки данных и временная обработка ошибок, кажется, идут рука об руку, поэтому мне интересно узнать, смогу ли я получить 2 из лучших библиотек для них - TPL Dataflow и Polly соответственно - приятно играть вместе.
В качестве отправной точки я хотел бы применить политику обработки ошибок к ActionBlock
. В идеале я хотел бы заключить его в метод создания блока с такой подписью:
ITargetBlock<T> CreatePollyBlock<T>(
Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)
Было бы достаточно просто policy.Execute
выполнить действие изнутри ActionBlock
, но у меня есть 2 требования:
- В случае повторной попытки я не хочу повторять попытку элемента, который имеет приоритет над другими объектами, находящимися в очереди. Другими словами, когда вы терпите неудачу, вы переходите на задний план.
- Что еще более важно, если перед повторной попыткой есть период ожидания, я не хочу, чтобы это блокировало поступление новых элементов. А если установлено значение
ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
, я не хочу, чтобы элементы, ожидающие повторной попытки, "подсчитали" против этого макс.
Чтобы удовлетворить эти требования, я полагаю, что мне нужен «внутренний» ActionBlock
с примененным пользователем ExecutionDataflowBlockOptions
и некоторый «внешний» блок, который отправляет элементы во внутренний блок и применяет любые ожидания и повторные попытки. логика (или то, что диктует политика) вне контекста внутреннего блока. Вот моя первая попытка:
// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();
public T Data { get; set; }
public Task Completion => _tcs.Task;
public void SetCompleted() => _tcs.SetResult(0);
public void SetFailed(Exception ex) => _tcs.SetException(ex);
}
ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
// create a block that marks WorkItems completed, and allows
// items to fault without faulting the entire block.
var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
try {
act(wi.Data);
wi.SetCompleted();
}
catch (Exception ex) {
wi.SetFailed(ex);
}
}, opts);
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
});
}
Чтобы проверить это, я создал блок с политикой ожидания и повтора и фиктивным методом, который выдает исключение в первые 3 раза, когда он вызывается (для всего приложения). Затем я дал ему некоторые данные:
"a", "b", "c", "d", "e", "f"
Я бы ожидал, что a, b и c потерпят неудачу и вернутся назад. Но я заметил, что они воздействуют на действие внутреннего блока в следующем порядке:
"a", "a", "a", "a", "b", "c", "d", "e", "f"
По сути, я не выполнил свои собственные требования, и довольно легко понять, почему: внешний блок не пропускает новые элементы, пока не произойдут все попытки текущего элемента. Простое, но, казалось бы, хакерское решение - добавить большое значение MaxDegreeOfParallelism
к внешнему блоку:
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
С этим изменением я заметил, что новые вещи действительно появляются до повторных попыток, но я также развязал некоторый хаос. Вход во внутренний блок стал случайным, хотя первые 3 элемента всегда находятся в конце:
"a", "e", "b", "c", "d", "a", "e", "b"
Так что это немного лучше. Но в идеале я бы хотел, чтобы порядок сохранился:
"a", "b", "c", "d", "e", "a", "b", "c"
Вот где я застрял, и, рассуждая об этом, мне интересно, возможно ли даже при этих ограничениях, в частности, что внутренние компоненты CreatePollyBlock
могут выполнять политику, но не могут определить это. Если бы эти внутренние компоненты могли, например, обеспечить повторную лямбду, я думаю, это дало бы мне гораздо больше возможностей. Но это часть политики определение , и по этой схеме я не могу этого сделать.
Заранее спасибо за любую помощь.