Как заставить этот асинхронный вызов метода работать? - PullRequest
4 голосов
/ 15 ноября 2010

Я пытался разработать конвейер метода с использованием асинхронного вызова метода. Логика для конвейера выглядит следующим образом

  1. В коллекции имеется n данных, которые должны быть переданы в число методов m в конвейере
  2. Перечислите коллекцию Т
  3. Введите первый элемент в первый метод
  4. Получить вывод, передать его во второй метод асинхронно
  5. Одновременно передать второй элемент коллекции первому методу
  6. После завершения первого метода передайте результат во второй метод (если второй метод все еще выполняется, поместите результат в его очередь и начните выполнение третьего элемента в первом методе)
  7. Когда второй метод завершает выполнение, берите первый элемент из очереди и выполняйте его и т. Д. (Каждый метод должен выполняться асинхронно, никто не должен ждать завершения следующего)
  8. В методе mth, после выполнения данных, сохранить результат в списке
  9. После завершения n-го элемента в m-м методе верните список результатов (n-количество результатов) на самый первый уровень.

Я придумал следующий код, но он не работал должным образом, результат никогда не возвращается и, более того, он не выполняется в том порядке, в каком он должен быть.

static class Program
    {
        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4 };
            var result = list.ForEachPipeline(Add, Square, Add, Square);
            foreach (var element in result)
            {
                Console.WriteLine(element);
                Console.WriteLine("---------------------");
            }
            Console.ReadLine();
        }

        private static int Add(int j)
        {
            return j + 1;
        }

        private static int Square(int j)
        {
            return j * j;
        }

        internal static void AddNotify<T>(this List<T> list, T item)
        {
            Console.WriteLine("Adding {0} to the list", item);
            list.Add(item);
        }    
    }

    internal class Function<T>
    {
        private readonly Func<T, T> _func;

        private readonly List<T> _result = new List<T>();
        private readonly Queue<T> DataQueue = new Queue<T>();
        private bool _isBusy;
        static readonly object Sync = new object();
        readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);

        internal Function(Func<T, T> func)
        {
            _func = func;
        }

        internal Function<T> Next { get; set; }
        internal Function<T> Start { get; set; }
        internal int Count;

        internal IEnumerable<T> Execute(IEnumerable<T> source)
        {
            var isSingle = true;
            foreach (var element in source) {
                var result = _func(element);
                if (Next != null)
                {
                    Next.ExecuteAsync(result, _waitHandle);
                    isSingle = false;
                }
                else
                    _result.AddNotify(result);
            }
            if (!isSingle)
                _waitHandle.WaitOne();
            return _result;
        }


        internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
        {
            lock(Sync)
            {
                if(_isBusy)
                {
                    DataQueue.Enqueue(element);
                    return;
                }
                _isBusy = true;

                _func.BeginInvoke(element, CallBack, resetEvent);
            }           
        }

        internal void CallBack(IAsyncResult result)
        {
            bool set = false;
            var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
            var resultElement = worker.EndInvoke(result);
            var resetEvent = result.AsyncState as ManualResetEvent;

            lock(Sync)
            {
                _isBusy = false;
                if(Next != null)
                    Next.ExecuteAsync(resultElement, resetEvent);
                else
                    Start._result.AddNotify(resultElement);

                if(DataQueue.Count > 1)
                {
                    var element = DataQueue.Dequeue();
                    ExecuteAsync(element, resetEvent);
                }
                if(Start._result.Count == Count)
                    set = true;
            }
            if(set)
              resetEvent.Set();
        }
    }

    public static class Pipe
    {
        public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
        {
            Function<T> start = null, previous = null;
            foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
            {
                if (start == null)
                {
                    start = previous = function;
                    start.Start = function;
                    continue;
                }
                function.Start = start;
                previous.Next = function;
                previous = function;
            }
            return start != null ? start.Execute(source) : null;
        }
    }

Не могли бы вы, ребята, помочь мне заставить эту штуку работать? Если этот дизайн не подходит для реального метода конвейера, пожалуйста, не стесняйтесь предложить другой.

Редактировать : Я должен строго придерживаться .Net 3.5.

Ответы [ 3 ]

1 голос
/ 15 ноября 2010

Я не сразу обнаружил проблему в вашем коде, но вы можете немного усложнить ситуацию.Это может быть более простой способ сделать то, что вы хотите.

public static class Pipe 
{
   public static IEnumerable<T> Execute<T>(
      this IEnumerable<T> input, params Func<T, T>[] functions)
   {
      // each worker will put its result in this array
      var results = new T[input.Count()];

      // launch workers and return a WaitHandle for each one
      var waitHandles = input.Select(
         (element, index) =>
         {
            var waitHandle = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(
               delegate
               {
                  T result = element;
                  foreach (var function in functions)
                  {
                     result = function(result);
                  }
                  results[index] = result;
                  waitHandle.Set();
               });
            return waitHandle;
         });

      // wait for each worker to finish
      foreach (var waitHandle in waitHandles)
      {
          waitHandle.WaitOne();
      }
      return results;
   }
}

Это не создает блокировку для каждой стадии конвейера, как в вашей собственной попытке.Я пропустил это, потому что это не казалось полезным.Однако вы можете легко добавить его, обернув функции следующим образом:

var wrappedFunctions = functions.Select(x => AddStageLock(x));

, где AddStageLock это:

private static Func<T,T> AddStageLock<T>(Func<T,T> function)
{
   object stageLock = new object();
   Func<T, T> wrappedFunction =
      x =>
      {
         lock (stageLock)
         {
            return function(x);
         }
      };
   return wrappedFunction;
}

edit: The Execute реализация, вероятно, будет медленнее, чем однопотоковое выполнение, если только работа, выполняемая для каждого отдельного элемента, не затрачивает накладных расходов на создание дескриптора ожидания и планирование задачи в пуле потоков. Чтобы действительно извлечь выгоду из многопоточности, необходимонакладные расходы;PLINQ в .NET 4 делает это путем разбиения данных .

1 голос
/ 15 ноября 2010

Есть ли какая-то конкретная причина для перехода на трубопровод?IMO, запуск отдельного потока для каждого ввода со всеми функциями, связанными одна за другой, будет проще для написания и быстрее для выполнения.Например,

function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
  T value = input;
  foreach(var f in pipe)
  {
    value = f(value);
  }
  return value;
}

var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
  ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}

Теперь, перейдя к вашему коду, я считаю, что для точной реализации конвейера с M-этапом у вас должно быть ровно M потоков, так как каждый этап может выполняться параллельно - теперь некоторые потоки могут простаивать.потому что я / р не дошел до них.Я не уверен, запускает ли ваш код какие-либо потоки и каков будет счет потоков в определенное время.

0 голосов
/ 15 ноября 2010

Почему вы не прерываете поток для каждой итерации и не объединяете свои результаты в блокирующем ресурсе. Вам нужно только сделать. Могли бы использовать PLinq для этого. Я думаю, вы можете ошибочно принять методы за ресурсы. Вам нужно заблокировать метод, только если он имеет дело с критическим блоком с общим ресурсом в нем. Отбирая ресурс и переходя оттуда к новому потоку, вы избавляетесь от необходимости управлять вторым методом.

I.E .: Метод X вызывает метод 1, затем передает значение в метод 2 Foreach пункт в обр Асинхронный (MethodX (пункт));

...