Как регулировать все исходящие асинхронные вызовы HttpClient между несколькими потоками в проекте .net Core API - PullRequest
0 голосов
/ 27 августа 2018

Я разрабатываю веб-API ядра .net, который использует внешний API, который я не контролирую. Я нашел несколько отличных ответов о переполнении стека, которые позволили мне регулировать мои запросы к этому внешнему API, находясь в том же потоке с использованием семафорлиса. Мне интересно, как лучше расширить это регулирование, чтобы оно распространялось на приложения, а не просто регулирование для определенного списка задач. Я узнал о HttpMessageHandlers, и это, кажется, возможный способ перехвата всех исходящих сообщений и применения регулирования. Но я беспокоюсь о безопасности потоков и проблемах блокировки, которые я не могу понять. Я включаю свой текущий код регулирования и надеюсь, что это может помочь понять, что я пытаюсь сделать, но в нескольких потоках, и с задачами, которые постоянно добавляются вместо заранее определенного списка задач.

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}

1 Ответ

0 голосов
/ 28 августа 2018

Концептуальные вопросы

  • SemaphoreSlim является поточно-ориентированным, поэтому нет проблем с поточностью или блокировкой при использовании его в качестве дросселя параллелизма между несколькими потоками.
  • HttpMessageHandler действительно является механизмом исходящего промежуточного программного обеспечения для перехвата вызовов, осуществляемых через HttpClient. Таким образом, они являются идеальным способом применения регулирования параллелизма к вызовам Http с использованием SemaphoreSlim.

Простая реализация

Так что ThrottlingDelegatingHandler может выглядеть так:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}

Создание и поддержка экземпляра как одиночного:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 

Примените это DelegatingHandler ко всем экземплярам HttpClient, через которые вы хотите выполнять параллельные вызовы:

HttpClient throttledClient = new HttpClient(throttle);

То, что HttpClient не обязательно должно быть одиночным: только экземпляр throttle.

Для краткости я опустил DI-код Dot Net Core, но вы должны зарегистрировать экземпляр синглтона ThrottlingDelegatingHandler в контейнере .Net Core, получить его с помощью DI в точке использования и использовать его в * 1034. * s вы строите как показано выше.

Но:

Лучшая реализация с HttpClientFactory (.NET Core 2.1)

Выше все еще возникает вопрос, как вы собираетесь управлять HttpClient временами жизни:

Одной из целей HttpClientFactory было управление жизненными циклами HttpClient экземпляров и их делегирующих обработчиков, чтобы избежать этих проблем.

В .NET Core 2.1 вы можете использовать HttpClientFactory, чтобы связать все это в ConfigureServices(IServiceCollection services) в классе Startup, например:

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();

("MyThrottledClient" здесь представляет собой подход именованного клиента только для краткости в этом примере; типизированные клиенты избегают именования строк.)

В точке использования получите IHttpClientFactory по DI ( ссылка ), затем позвоните

var client = _clientFactory.CreateClient("MyThrottledClient");

для получения HttpClient экземпляра, предварительно настроенного с помощью синглтона ThrottlingDelegatingHandler.

Все вызовы через экземпляр HttpClient, полученные таким образом, будут перенаправлены (как правило, через приложение) на изначально настроенный int maxParallelism.

И HttpClientFactory волшебным образом решает все проблемы HttpClient времени жизни.

Использование Polly с IHttpClientFactory для получения всего этого "из коробки"

Полли глубоко интегрирован с IHttpClientFactory , и Полли также предоставляет Политика перегородки , которая работает как дроссель параллелизма с помощью идентичного механизма SemaphoreSlim .

Таким образом, в качестве альтернативы ручному развертыванию ThrottlingDelegatingHandler, вы также можете просто использовать политику Polly Bulkhead с IHttpClientFactory из коробки. В вашем Startup классе просто:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);

Получите предварительно настроенный экземпляр HttpClient из HttpClientFactory, как и ранее. Как и прежде, все вызовы через такой экземпляр HttpClient MyThrottledClient будут параллельно регулироваться настроенным maxParallelism.

Политика Polly Bulkhead дополнительно предлагает возможность настроить, сколько операций вы хотите разрешить одновременно «ставить в очередь» для слота выполнения в главном семафоре. Так, например:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);

при настройке, как указано выше, в HttpClient, разрешает 10 параллельных http-вызовов и до 100 http-вызовов в «очередь» для слота выполнения. Это может обеспечить дополнительную отказоустойчивость для систем с высокой пропускной способностью за счет предотвращения сбоя в нисходящей системе, вызывающего чрезмерное увеличение ресурсов при вызовах в очереди в восходящем направлении.

Чтобы использовать параметры Polly с HttpClientFactory, извлеките пакеты nuget для Microsoft.Extensions.Http.Polly и Polly.

Ссылки: Полли глубокая документация по Полли и IHttpClientFactory ; Политика переборки .


Приложение к задачам

В вопросе используется Task.Run(...) и упоминается:

веб-API ядра .net, который использует внешний API

и

с постоянно добавляемыми задачами вместо предопределенного списка задач.

Если ваш веб-интерфейс .net core использует только внешний API один раз для каждого запроса, обрабатывает веб-интерфейс .net core, и вы применяете подходы, обсуждаемые в оставшейся части этого ответа, разгрузив внешний http ниже по потоку. вызов нового Task с Task.Run(...) будет ненужным и создаст служебную информацию только в дополнительных Task экземплярах и переключении потоков. Dot net core уже будет выполнять входящие запросы в нескольких потоках в пуле потоков.

...