Правильный способ использования HttpClient в Parallel.Foreach для выполнения большого количества запросов POST - PullRequest
0 голосов
/ 19 июня 2019

Как вы можете видеть в приведенном ниже коде, на высоком уровне этот код рекурсивно читает структуру папок и отправляет ее содержимое в API. Приложение .Net core 2.1.

У меня есть этот сервис, который делает POST для API.

    public class EnterpriseService
    {
        private readonly HttpClient _httpClient;

        public EnterpriseService(HttpClient httpClient)
        {
            _httpClient = httpClient;
        }

        public async Task<string> PostTransactionAsync(byte[] payload)
        {
            using (var request = new HttpRequestMessage(HttpMethod.Post, new Uri("https://www.foo.com/api/transaction")))
            {
                request.Content = new ByteArrayContent(payload);
                HttpResponseMessage response = await _httpClient.SendAsync(request);
                return await response.Content.ReadAsStringAsync();
            }
        }
    }

PostTransactionAsync вызывается вызывающей стороной следующим образом:

        protected async Task SearchFoldersAsync(List<FileStatusProperties> folders, string root, CancellationToken cancellationToken)
        {
            await Task.Run(() =>
            {
                return Parallel.ForEach(folders, async entry =>
                {
                    if (entry.Type == FileType.DIRECTORY)
                    {
                        await SearchFoldersAsync(
                            DataLakeStorage.DirectoryGetFiles($"{root}/{entry.PathSuffix}"),
                            $"{root}/{entry.PathSuffix}", cancellationToken);
                        return;
                    }

                    byte[] payload = DataLakeStorage.FileDownload($"{root}/{entry.PathSuffix}");
                    await _enterpriseService.PostTransactionAsync(payload);

                });
            }, cancellationToken);

        }

Обратите внимание, что я использую HttpClient, который DI'd как синглтон.

У меня также есть рекурсивное использование Parallel.Foreach.

Этот код отлично работает для структуры папок меньшего размера с 10K + файлами. Но когда число файлов увеличивается (скажем, оно достигает около 100 КБ файлов в папках), я получаю смесь этих двух ошибок. примерно 20% запросов удовлетворяются. 40% запросов заканчиваются этими двумя исключениями, каждый из которых вызывается _httpClient.SendAsync. Запросы не выполняются через 10 секунд.

Только одно использование каждого адреса сокета (протокол / сетевой адрес / порт) обычно разрешено

и

Операция была отменена. Невозможно прочитать данные из транспорта соединение: операция ввода-вывода была прервана из-за выход из потока или запрос приложения. Операция ввода / вывода была прервано из-за выхода из потока или запроса приложения

Я читал об использовании HttpClient и, насколько я могу судить, я не делаю ничего плохого. Но я не уверен, что его можно использовать с рекурсивным Parallel.ForEach.

Я хотел бы знать, каков рекомендуемый способ обработки этого сценария, когда мне нужно одновременно выполнять большое количество http-запросов?

Ответы [ 2 ]

1 голос
/ 19 июня 2019

Parallel - для параллелизма, который является формой параллелизма, которая использует несколько потоков для разделения работы с процессором на несколько ядер. Вам нужен асинхронный параллелизм, который является более подходящим подходом для одновременного выполнения нескольких операций ввода-вывода.

Асинхронный параллелизм проще всего выполнить, начав Task для каждого элемента (обычно используя Select), а затем выполнив await Task.WhenAll для всех этих задач. Примерно так:

protected async Task SearchFoldersAsync(List<FileStatusProperties> folders, string root, CancellationToken cancellationToken)
{
  var tasks = folders.Select(async entry =>
  {
    if (entry.Type == FileType.DIRECTORY)
    {
      await SearchFoldersAsync(
          DataLakeStorage.DirectoryGetFiles($"{root}/{entry.PathSuffix}"),
          $"{root}/{entry.PathSuffix}", cancellationToken);
      return;
    }

    byte[] payload = DataLakeStorage.FileDownload($"{root}/{entry.PathSuffix}");
    await _enterpriseService.PostTransactionAsync(payload);
  }).ToList();
  await Task.WhenAll(tasks);
}
0 голосов
/ 19 июня 2019

Я не собираюсь обращаться к Parallel vs async ...

Но это конкретная ошибка

Обычно разрешено только одно использование каждого адреса сокета (протокола / сетевого адреса / порта)

Казалось бы, из-за того, что вы можете иметь только около 65 тыс. Соединений от одной системы к одному порту в другой системе.

Предполагая, что существующий процесс сервера использует порт 80, вы можете запустить дополнительные процессы, которые используют другие порты. Но вам понадобится более 1 HttpClient и вам нужно будет округлить робин или что-то между ними. Слишком много процессов, и вы, вероятно, начнете превышать пределы открытых дескрипторов файлов на клиенте или сервере.

...