Как порождать несколько потоков асинхронно для вызова метода несколько раз в C #? - PullRequest
0 голосов
/ 08 января 2019

Мне нужно несколько раз вызвать рабочий метод, чтобы загрузить данные в базу данных. Я хочу сделать некоторые параллелизм с этим и быть в состоянии указать количество потоков для использования. Я думал об использовании оператора мод для разделения рабочей нагрузки, но застрял на том, как реализовать с async await.

Таким образом, асинхронный метод должен создать n количество потоков, а затем вызвать рабочий метод, чтобы параллельно было n потоков работы. Рабочий метод является синхронным.

Я попробовал, но вполне уверен, как реализовать то, что я хочу. Есть шаблон для этого?

Какой-то код, с которым я играл:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace TestingAsync
{
    class Program
    {
        static void Main(string[] args)
        {
            int threads = 3;
            int numItems = 10;
            Task task = ThreadIt(threads, numItems);
        }

        static async Task ThreadIt(int threads, int numItems)
        {
            Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
            for (int i = 0; i < numItems; i++)
            {
                Console.Write($"Adding item: {i} mod 1: {i % threads}. ");
                int task = await DoSomeWork(i%threads, 500);
            }
        }

        static async Task<int> DoSomeWork(int Item, int time)
        {
            Console.Write($"    Working.on item {Item}..");
            Thread.Sleep(time );
            Console.WriteLine($"done.");
            return Item;
        }
    }
}

EDIT:

Я собираюсь перефразировать, потому что, может быть, я не совсем понял свои требования.

Я хочу создать n количество потоков. Будет обработано x количество элементов, и я хочу, чтобы они были поставлены в очередь с помощью mod (или что-то еще), а затем параллельно обработаны в потоках n. Когда один элемент завершен, я хочу, чтобы следующий элемент был обработан немедленно , а не ждать завершения всех трех потоков. Некоторые элементы будут обрабатываться дольше, чем другие , может быть, даже до 10 раз дольше, поэтому другим потокам не следует ожидать завершения одного из потоков.

Например, если у нас 3 темы и 9 элементов, это может произойти:

thread1: items 0,3,6 
thread2: items 1,4,7
thread3: items 2,5,8

каждый поток обрабатывает свою рабочую нагрузку по порядку и не ожидает между каждым элементом.

Ответы [ 2 ]

0 голосов
/ 09 февраля 2019

Мне нужно несколько раз вызвать рабочий метод, чтобы загрузить данные в базу данных. Я хочу сделать параллелизм с этим и иметь возможность указать количество потоков для использования ... Рабочий метод является синхронным ... Есть ли шаблон для этого?

Да, библиотека параллельных задач.

Дано:

static int DoSomeWork(int Item, int time)
{
  Console.Write($"    Working.on item {Item}..");
  Thread.Sleep(time);
  Console.WriteLine($"done.");
  return Item;
}

Вы можете распараллелить его так:

static List<int> ThreadIt(int threads, int numItems)
{
  Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
  var items = Enumerable.Range(0, numItems);
  return items.AsParallel().WithDegreeOfParallelism(threads)
      .Select(i => DoSomeWork(i, 500))
      .ToList();
}
0 голосов
/ 08 января 2019

Вы можете попытаться создать List<Task<T>> и запустить их, а затем await с WhenAll, если вы хотите, чтобы все задачи были выполнены, или WhenAny, если какая-либо из них завершится:

static async Task ThreadIt(int threads, int numItems)
{
      List<Task<int>> tasks = new List<Task<int>>();
      Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
      for (int i = 0; i < numItems; i++)
      {
           Console.Write($"Adding item: {i} mod 1: {i % threads}. ");
           tasks.Add(DoSomeWork(i%threads, 500));
      }

       var result = await Task.WhenAll(tasks);
}

и при использовании Task, async и await мы должны использовать Task.Delay вместо Thread.Sleep:

static async Task<int> DoSomeWork(int Item, int time)
{
      Console.Write($"    Working.on item {Item}..");
      await Task.Delay(time); // note this 
      Console.WriteLine($"done.");
      return Item;
}

EDIT:

Вы можете создать ConcurrentQueue, а затем удалять каждый раз, когда 3 Задачи завершаются, и генерировать следующие 3 типа:

static async Task ThreadIt(int threads, int numItems)
{
      ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
      Enumerable.Range(0, 10).ForEach(x => queue.Enqueue(x));
      List<Task<int>> tasks = new List<Task<int>>();
      Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
      while (!queue.IsEmpty)
      {
          for (int i = 0; i < threads; i++)
          {
              if(queue.TryDequeue(out int val))
              {
                   Console.Write($"Adding item: {val} mod 1: {val % threads}. ");
                   tasks.Add(DoSomeWork(val%threads, 500));
              }
          }

       var result = await Task.WhenAll(tasks);
     }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...