Один поток работает, а многопоточный не работает - PullRequest
0 голосов
/ 14 марта 2020

Ниже приведен мой текущий код, который получает 500 документов (формат JSON) из documentDB за вызов. Я могу сделать только 500 за поиск и добавить его в параллельную сумку (параллельно). Полученные данные основаны на идентификационном номере, который я предоставляю, где API и выбирает его из этого диапазона. Например, id = 500 [получает документы от 501 до 1000]. Приведенный ниже код заполняет параллельную сумку 25 тыс. Документов, как и ожидалось.

int threadNumber = 5;    
var concurrentBag = new ConcurrentBag<docClass>();

    if (batch == 25000)
    {
        id = 500;
        while (id <= 25000)
        {
         docs = await client.SearchDocuments<docClass>(GetFollowUpRequest(id), requestOptions);
         docClass lastdoc = docs.Documents.Last();
         lastid = lastdoc.Id.Id;

         Parallel.ForEach(docs.Documents, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, item =>
          {
              concurrentBag.Add(item);
          });
         id = id + 500;
        }
    }

Я хотел запустить все это время, пока l oop в потоке, чтобы я мог выполнять множественный вызов API и параллельно получать 500 документов. Я пытался изменить код, как показано ниже, но всегда я вижу только 500 документов, которые все еще находятся в параллельной сумке 'concurrentBag' после всего прогона, а идентификатор пропуска остается равным 500 и не увеличивается.

    int threadNumber = 5;    
    var concurrentBag = new ConcurrentBag<docClass>();

if (batch == 25000)
 {
     id = 500;
     Task[] tasks = new Task[threadNumber];

     for (int j = 0; j < threadNumber; j++)
     {
         tasks[j] = Task.Run(async() =>
         {
             while (id <= 25000)
             {
                 docs = await client.SearchDocuments<docClass>(GetFollowUpRequest(id), requestOptions);
                 docClass lastdoc = docs.Documents.Last();
                 lastid = lastdoc.Id.Id;

                 Parallel.ForEach(docs.Documents, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, item =>
                 {
                     concurrentBag.Add(item);
                 });

                 id = id + 500;
             }
         });
     }
 }

Не могли бы вы, пожалуйста помогите что я тут не так делаю?

1 Ответ

0 голосов
/ 14 марта 2020

Для загрузки документа из внешних ресурсов используйте асинхронный подход без лишних потоков.

Обратите внимание, что при параллельной загрузке внешних ресурсов дополнительные потоки не работают, а просто ждут ответа, поэтому потоки просто теряются;)

Асинхронный подход обеспечивает возможность запуска несколько запросов почти одновременно, не ожидая завершения каждой задачи, но ожидайте только тогда, когда все задачи будут готовы.

var maxDocuments = 25000;
var step = 500;
var documentTasks = Enumerable.Range(1, int.Max)
    .Select(offset => step * offset)
    .TakeWhile(id => id <= maxDocuments)
    .Select(id => client.Search<docClass>(GetFollowUpRequest(id), requestOptions))
    .ToArray();

await Task.WhenAll(documentTasks);

var allDocuments = documentTasks
    .Select(task = task.Result)
    .SelectMany(documents => documents)
    .ToArray();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...