Использование Polly с потоком данных TPL - PullRequest
0 голосов
/ 06 сентября 2018

Конвейеры обработки данных и временная обработка ошибок, кажется, идут рука об руку, поэтому мне интересно узнать, смогу ли я получить 2 из лучших библиотек для них - TPL Dataflow и Polly соответственно - приятно играть вместе.

В качестве отправной точки я хотел бы применить политику обработки ошибок к ActionBlock. В идеале я хотел бы заключить его в метод создания блока с такой подписью:

ITargetBlock<T> CreatePollyBlock<T>(
    Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)

Было бы достаточно просто policy.Execute выполнить действие изнутри ActionBlock, но у меня есть 2 требования:

  1. В случае повторной попытки я не хочу повторять попытку элемента, который имеет приоритет над другими объектами, находящимися в очереди. Другими словами, когда вы терпите неудачу, вы переходите на задний план.
  2. Что еще более важно, если перед повторной попыткой есть период ожидания, я не хочу, чтобы это блокировало поступление новых элементов. А если установлено значение ExecutionDataflowBlockOptions.MaxDegreeOfParallelism, я не хочу, чтобы элементы, ожидающие повторной попытки, "подсчитали" против этого макс.

Чтобы удовлетворить эти требования, я полагаю, что мне нужен «внутренний» ActionBlock с примененным пользователем ExecutionDataflowBlockOptions и некоторый «внешний» блок, который отправляет элементы во внутренний блок и применяет любые ожидания и повторные попытки. логика (или то, что диктует политика) вне контекста внутреннего блока. Вот моя первая попытка:

// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
    private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();

    public T Data { get; set; }
    public Task Completion => _tcs.Task;

    public void SetCompleted() => _tcs.SetResult(0);
    public void SetFailed(Exception ex) => _tcs.SetException(ex);
}

ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
    // create a block that marks WorkItems completed, and allows
    // items to fault without faulting the entire block.
    var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
        try {
            act(wi.Data);
            wi.SetCompleted();
        }
        catch (Exception ex) {
            wi.SetFailed(ex);
        }
    }, opts);

    return new ActionBlock<T>(async x => {
        await policy.ExecuteAsync(async () => {
            var workItem = new WorkItem<T> { Data = x };
            await innerBlock.SendAsync(workItem);
            await workItem.Completion;
        });
    });
}

Чтобы проверить это, я создал блок с политикой ожидания и повтора и фиктивным методом, который выдает исключение в первые 3 раза, когда он вызывается (для всего приложения). Затем я дал ему некоторые данные:

"a", "b", "c", "d", "e", "f"

Я бы ожидал, что a, b и c потерпят неудачу и вернутся назад. Но я заметил, что они воздействуют на действие внутреннего блока в следующем порядке:

"a", "a", "a", "a", "b", "c", "d", "e", "f"

По сути, я не выполнил свои собственные требования, и довольно легко понять, почему: внешний блок не пропускает новые элементы, пока не произойдут все попытки текущего элемента. Простое, но, казалось бы, хакерское решение - добавить большое значение MaxDegreeOfParallelism к внешнему блоку:

return new ActionBlock<T>(async x => {
    await policy.ExecuteAsync(async () => {
        var workItem = new WorkItem<T> { Data = x };
        await innerBlock.SendAsync(workItem);
        await workItem.Completion;
    });
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });

С этим изменением я заметил, что новые вещи действительно появляются до повторных попыток, но я также развязал некоторый хаос. Вход во внутренний блок стал случайным, хотя первые 3 элемента всегда находятся в конце:

"a", "e", "b", "c", "d", "a", "e", "b"

Так что это немного лучше. Но в идеале я бы хотел, чтобы порядок сохранился:

"a", "b", "c", "d", "e", "a", "b", "c"

Вот где я застрял, и, рассуждая об этом, мне интересно, возможно ли даже при этих ограничениях, в частности, что внутренние компоненты CreatePollyBlock могут выполнять политику, но не могут определить это. Если бы эти внутренние компоненты могли, например, обеспечить повторную лямбду, я думаю, это дало бы мне гораздо больше возможностей. Но это часть политики определение , и по этой схеме я не могу этого сделать.

Заранее спасибо за любую помощь.

...