Шаблон для ограничения количества одновременных асинхронных вызовов - PullRequest
0 голосов
/ 09 апреля 2010

Мне нужно получить несколько объектов из внешней системы. Внешняя система поддерживает множественные одновременные запросы (то есть потоки), но возможно заполнение внешней системы - поэтому я хочу иметь возможность извлекать несколько объектов асинхронно, но я хочу иметь возможность регулировать количество одновременных асинхронных запросов. то есть мне нужно получить 100 элементов, но я не хочу, чтобы извлекал более 25 из них одновременно. Когда каждый запрос из 25 завершается, я хочу запустить другой поиск, и как только они будут завершены, я хочу вернуть все результаты в том порядке, в котором они были запрошены (т.е. нет смысла возвращать результаты, пока не будет возвращен весь вызов). ). Есть ли рекомендуемые образцы для такого рода вещей?

Подойдет ли что-то подобное (очевидно, псевдокод)?

  private List<externalSystemObjects> returnedObjects = new List<externalSystemObjects>;

  public List<externalSystemObjects> GetObjects(List<string> ids)
  {
      int callCount = 0;
      int maxCallCount = 25;
      WaitHandle[] handles;

      foreach(id in itemIds to get)
      {
          if(callCount < maxCallCount)
          {
               WaitHandle handle = executeCall(id, callback);
               addWaitHandleToWaitArray(handle)
          }
      else
      {
           int returnedCallId = WaitHandle.WaitAny(handles);
           removeReturnedCallFromWaitHandles(handles);
      }
   }

   WaitHandle.WaitAll(handles);

   return returnedObjects;
   }

   public void callback(object result)
   {
         returnedObjects.Add(result);
   }

Ответы [ 2 ]

1 голос
/ 09 апреля 2010

Рассмотрим список элементов для обработки в виде очереди, из которой 25 потоков обработки снимают задачи, обрабатывают задачи, добавляют результат и повторяют до тех пор, пока очередь не станет пустой:

 class Program
  {
    class State
    {
      public EventWaitHandle Done;
      public int runningThreads;
      public List<string> itemsToProcess;
      public List<string> itemsResponses;
    }

    static void Main(string[] args)
    {
      State state = new State();

      state.itemsResponses = new List<string>(1000);
      state.itemsToProcess = new List<string>(1000);
      for (int i = 0; i < 1000; ++i)
      {
        state.itemsToProcess.Add(String.Format("Request {0}", i));
      }

      state.runningThreads = 25;
      state.Done = new AutoResetEvent(false);

      for (int i = 0; i < 25; ++i)
      {
        Thread t =new Thread(new ParameterizedThreadStart(Processing));
        t.Start(state);
      }

      state.Done.WaitOne();

      foreach (string s in state.itemsResponses)
      {
        Console.WriteLine("{0}", s);
      }
    }

    private static void Processing(object param)
    {
      Debug.Assert(param is State);
      State state = param as State;

      try
      {
        do
        {
          string item = null;
          lock (state.itemsToProcess)
          {
            if (state.itemsToProcess.Count > 0)
            {
              item = state.itemsToProcess[0];
              state.itemsToProcess.RemoveAt(0);
            }
          }
          if (null == item)
          {
            break;
          }
          // Simulate some processing
          Thread.Sleep(10);
          string response = String.Format("Response for {0} on thread: {1}", item, Thread.CurrentThread.ManagedThreadId);
          lock (state.itemsResponses)
          {
            state.itemsResponses.Add(response);
          }
        } while (true);

      }
      catch (Exception)
      {
        // ...
      }
      finally
      {
        int threadsLeft = Interlocked.Decrement(ref state.runningThreads);
        if (0 == threadsLeft)
        {
          state.Done.Set();
        }
      }
    }
  }

Вы можете сделать то же самое, используя асинхронные обратные вызовы, нет необходимости использовать потоки.

0 голосов
/ 09 апреля 2010

Наличие некоторой структуры, похожей на очередь, для хранения ожидающих запросов - довольно распространенная модель. В веб-приложениях, где может быть несколько уровней обработки, вы видите подход в стиле «воронка», когда ранние части изменения обработки имеют большие очереди. Также может быть установлен какой-то тип приоритетов для очередей, запросы с более высоким приоритетом перетасовываются в верхнюю часть очереди.

Одна важная вещь, которую следует учитывать в вашем решении, состоит в том, что если частота поступления запросов выше, чем ваша скорость обработки (это может быть связано с атакой типа «отказ в обслуживании» или просто то, что какая-то часть обработки сегодня необычайно медленная), то ваш очереди будут увеличиваться без ограничений. Вы должны иметь некоторую политику, например, немедленно отклонять новые запросы, когда глубина очереди превышает какое-либо значение.

...