Как запустить набор функций параллельно и ждать результатов по завершении? - PullRequest
4 голосов
/ 01 ноября 2010

У меня есть требование запускать набор тяжелых функций в одно и то же время и заполнять результаты в списке. Вот псевдокод для этого:

List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();

foreach(var task in tasks)
{
    // Run Logic in question
    1. Run each task asynchronously/parallely
    2. Put the results in the results list upon each task completion
}

Console.WriteLine("All tasks completed and results populated");

Мне нужна логика внутри foreach Бока. Ребята, не могли бы вы мне помочь?

У меня есть некоторые ограничения: решение должно быть совместимым с .net 3.5 (не .net 4, но для моих знаний будет полезно альтернативное решение .net 4)

Заранее спасибо.

Ответы [ 6 ]

4 голосов
/ 01 ноября 2010

Простая реализация 3.5 может выглядеть следующим образом

List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();

ManualResetEvent waitHandle = new ManualResetEvent(false);
void RunTasks()
{
    int i = 0;
    foreach(var task in tasks)
    {
        int captured = i++;
        ThreadPool.QueueUserWorkItem(state => RunTask(task, captured))
    }

    waitHandle.WaitOne();

    Console.WriteLine("All tasks completed and results populated");
}

private int counter;
private readonly object listLock = new object();
void RunTask(Func<T, TResult> task, int index)
{
    var res = task(...); //You haven't specified where the parameter comes from
    lock (listLock )
    {
       results[index] = res;
    }
    if (InterLocked.Increment(ref counter) == tasks.Count)
        waitHandle.Set();
}
4 голосов
/ 01 ноября 2010
List<Func<T, TResult>> tasks = PopulateTasks();
TResult[] results = new TResult[tasks.Length];
Parallel.For(0, tasks.Count, i =>
    {
        results[i] = tasks[i]();
    });

TPL для 3.5, очевидно, существует .

1 голос
/ 01 ноября 2010

Другой вариант будет с небольшой реализацией шаблона будущего:

    public class Future<T>
    {
        public Future(Func<T> task)
        {
            Task = task;
            _asyncContext = task.BeginInvoke(null, null);
        }

        private IAsyncResult _asyncContext;

        public Func<T> Task { get; private set; }
        public T Result
        {
            get
            {
                return Task.EndInvoke(_asyncContext);
            }
        }

        public bool IsCompleted
        {
            get { return _asyncContext.IsCompleted; }
        }
    }

    public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks)
    {
        List<Future<T>> asyncContext = new List<Future<T>>();
        foreach (var task in tasks)
        {
            asyncContext.Add(new Future<T>(task));
        }
        return asyncContext;
    }

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures)
    {
        foreach (var future in futures)
        {
            yield return future.Result;
        }
    }

    public static void Main()
    {
        var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();

        var futures = RunAsync(tasks);
        var results = WaitForAll(futures);
        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    public static int ComputeValue()
    {
        Thread.Sleep(1000);
        return Guid.NewGuid().ToByteArray().Sum(a => (int)a);
    }
1 голос
/ 01 ноября 2010
    public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks)
    {
        List<IAsyncResult> asyncContext = new List<IAsyncResult>();
        foreach (var task in tasks)
        {
            asyncContext.Add(task.BeginInvoke(null, null));
        }
        return asyncContext;
    }

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext)
    {
        IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator();
        foreach (var task in tasks)
        {
            iterator.MoveNext();
            yield return task.EndInvoke(iterator.Current);
        }
    }

    public static void Main()
    {
        var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();

        var asyncContext = RunAsync(tasks);
        var results = WaitForAll(tasks, asyncContext);
        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    public static int ComputeValue()
    {
        Thread.Sleep(1000);
        return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    }
0 голосов
/ 02 ноября 2010

Выполняйте обработку в отдельных рабочих экземплярах, каждый в своем собственном потоке.Используйте обратный вызов, чтобы передать результаты и сообщить вызывающему процессу, что поток завершен.Используйте словарь для отслеживания запущенных потоков.Если у вас много потоков, вы должны загрузить очередь и запустить новые потоки, как старые.В этом примере все потоки создаются перед запуском любого из них, чтобы предотвратить состояние гонки, при котором число запущенных потоков падает до нуля до запуска финальных потоков.

    Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>();
    void LaunchWorkers()
    {
        foreach (var task in tasks)
        {
            Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult));
            Thread thread = new Thread(worker.Done);
            thread.IsBackground = true;
            activeThreads.Add(thread.ManagedThreadId, thread);
        }
        lock (activeThreads)
        {
            activeThreads.Values.ToList().ForEach(n => n.Start());
        }
    }

    void ProcessResult(int threadId, TResult result)
    {
        lock (results)
        {
            results.Add(result);
        }
        lock (activeThreads)
        {
            activeThreads.Remove(threadId);
            // done when activeThreads.Count == 0
        }
    }
}

public delegate void WorkerDoneDelegate(object results);
class Worker
{
    public WorkerDoneDelegate Done;
    public void Work(Task task, WorkerDoneDelegate Done)
    {
        // process task
        Done(Thread.CurrentThread.ManagedThreadId, result);
    }
}
0 голосов
/ 01 ноября 2010

традиционным способом является использование Семпахора.Инициализируйте семафор с количеством используемых потоков, затем запускайте поток для каждой задачи и ожидайте объект семафора.Когда каждый поток завершается, он должен увеличивать семафор.Когда счетчик семафоров достигнет 0, основной поток, который ожидал, продолжится.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...