Спавн процессы, но только 5 одновременно - PullRequest
6 голосов
/ 05 декабря 2010

Я работаю над очередью с именами файлов. Каждый файл должен обрабатываться внешним двоичным файлом. Это прекрасно работает, но одновременно обрабатывает только один файл. Возможно ли одновременное создание двух процессов?

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

while (queue.Count > 0)
{
    string file = queue.Dequeue();

    Process p = new Process();    
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
}

Обновление: мне нравится решение от Alex LE (процессы Spawn, но только по 5 одновременно), но возможно ли дождаться выхода дочерних процессов, как предложил Бен Фойгт?

Редактировать 1: мне нужно проверить p.ExitCode == 0, чтобы сделать некоторые обновления базы данных.

Ответы [ 6 ]

3 голосов
/ 06 декабря 2010

Вот что должно было быть возможно, если бы дескриптор ожидания, связанный с процессом, был помечен как открытый, а не как внутренний, как в настоящее время (проголосуйте здесь, чтобы попросить Microsoft изменить это) :

void BatchProcess()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    ...
    queue.Enqueue("10000.mp3");

    WaitHandle[] subprocesses = new WaitHandle[Math.Min(queue.Count, 5)];
    for( int i = 0; i < subprocesses.Length; i++ ) {
        subprocesses[i] = Spawn(queue.Dequeue());
    }

    while (queue.Count > 0) {
        int j = WaitHandle.WaitAny(subprocesses);
        subprocesses[j].Dispose();
        subprocesses[j] = Spawn(queue.Dequeue());
    }

    WaitHandle.WaitAll(subprocesses);
    foreach (var wh in subprocesses) {
        wh.Dispose();
    }
}

ProcessWaitHandle Spawn(string args)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        return p.WaitHandle;
    }
}

Это было бы наиболее эффективным из возможных решений, поскольку не требуется никаких объектов синхронизации, кроме самих процессов Win32. В коде C # не требуется никаких дополнительных потоков и асинхронных вызовов методов, поэтому блокировка или другая синхронизация не требуются.

1 голос
/ 06 декабря 2010

Это работает (это будет легче с асинхронным ожиданием C # 5.0):

Queue<string> queue = new Queue<string>();
queue.Enqueue("1.mp3");
queue.Enqueue("2.mp3");
queue.Enqueue("3.mp3");
...
queue.Enqueue("10000.mp3");

int runningProcesses = 0;
const int MaxRunningProcesses = 5;
object syncLock = new object();

Action<string> run = new Action<string>(delegate(string file) {
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }
});

Action runNext = null;
runNext = delegate() {
    lock (syncLock) {
        if (queue.Count > 0) {
            run.BeginInvoke(queue.Dequeue(), new AsyncCallback(delegate {
                runNext();
            }), null);
        }
    }
};

while (runningProcesses++ < MaxRunningProcesses) {
    runNext();
}
1 голос
/ 05 декабря 2010

Извлечение некоторых частей вашего кода и добавление семафора:

Semaphore semX = new Semaphore(5, int.MaxValue);

void f(name, args) {
    using (Process p = new Process())
    {
        p.StartInfo.FileName = name;
        p.StartInfo.Arguments = args;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.StartInfo.RedirectStandardOutput = true;
        p.Start();
        p.WaitForExit();
    }

    semX.Release();     // !!! This one is important
}

Затем вы используете

while (queue.Count > 0)
{
    string file = queue.Dequeue();
    semX.WaitOne();    // !!!
    (new Thread((ThreadStart) (() => f(file, "")))).Start();    // dirty unreadable code to start a routine async
}

for (int n = 5; n > 0; n--)        // Wait for the last 5 to finish
    semX.WaitOne();

semX.Dispose();                    // Dispose the semaphore
0 голосов
/ 07 декабря 2010

Этот блок частично заблокирует основной поток на основе ответа Бена, но он уже запущен.

static void Run(string file)
{
    using (Process p = new Process()) {
        p.StartInfo.FileName = @"binary.exe";
        p.StartInfo.Arguments = file;
        p.StartInfo.UseShellExecute = false;
        p.StartInfo.CreateNoWindow = true;
        p.Start();
        p.WaitForExit();
    }
}

static WaitHandle RunAsync(string file)
{
    Action<string> result = new Action<string>(Run).BeginInvoke(file, null, null);
    return result.AsyncWaitHandle;
}

static void Main()
{
    Queue<string> queue = new Queue<string>();
    queue.Enqueue("1.mp3");
    queue.Enqueue("2.mp3");
    queue.Enqueue("3.mp3");
    queue.Enqueue("4.mp3");
    queue.Enqueue("5.mp3");
    queue.Enqueue("6.mp3");
    // ...
    queue.Enqueue("10000.mp3");


    const int MaxRunningProcesses = 5;

    List<WaitHandle> runningProcesses = new List<WaitHandle>(MaxRunningProcesses);

    while (queue.Count > 0 && runningProcesses.Count < MaxRunningProcesses) {
        runningProcesses.Add(RunAsync(queue.Dequeue()));
    }

    while (runningProcesses.Count > 0) {
        int j = WaitHandle.WaitAny(runningProcesses.ToArray());
        runningProcesses[j].Close();
        runningProcesses.RemoveAt(j);
        if (queue.Count > 0) {
            runningProcesses.Add(RunAsync(queue.Dequeue()));
        }
    }
}
0 голосов
/ 06 декабря 2010

В основном у вас есть проблема с потребителем производителя.Поэтому вы должны обязательно использовать коллекции в пространстве имен System.Collections.Concurrent .Вот простой пример, который вы можете просто применить к своей проблеме - в качестве дополнительного бонуса вы можете начать заполнять очередь и ее обработку одновременно!

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;

class Program
{
    static readonly BlockingCollection<string> _collection = new BlockingCollection<string>();

    static void Main()
    {
        const int maxTasks = 5;
        var tasks = new List<Task> {
            // startup publisher task...
            Task.Factory.StartNew(() => { 
                for(var i = 0; i < 1000; i++)
                {
                    _collection.Add(i + ".mp3");
                }
                Console.WriteLine("Publisher finished");
                _collection.CompleteAdding();
            }),
        };
        for (var i = 0; i < maxTasks; i++)
        {
            tasks.Add(Task.Factory.StartNew(ConsumerTask(i)));
        }
        Task.WaitAll(tasks.ToArray()); // wait for completion
    }

    static Action ConsumerTask(int id)
    {
        // return a closure just so the id can get passed
        return () =>
        {
            string item;
            while (true)
            {
                if (_collection.TryTake(out item, -1))
                {
                    using(Process p = new Process())
                    {
                        p.StartInfo.FileName = "binary.exe";
                        p.StartInfo.Arguments = item;
                        p.Start();
                        p.WaitForExit();
                        var exitCode = p.ExitCode;
                        // TODO handle exit code
                    }
                }
                else if (_collection.IsAddingCompleted)
                {
                    break; // exit loop
                }
            }
            Console.WriteLine("Consumer {0} finished", id);
        };
    }
}
0 голосов
/ 05 декабря 2010

Вы можете использовать семафоры для этого и асинхронно вызывать длительный процесс столько раз, сколько захотите:

private Semaphore _semaphore;
private delegate void Processor(string fileName);
[Test]
public void SetterTest() {
  var queue = new Queue<string>();
  queue.Enqueue("1.mp3");
  queue.Enqueue("2.mp3");
  queue.Enqueue("3.mp3");
  // ..
  queue.Enqueue("10000.mp3");
  var noOfThreads = 5;
  using (_semaphore = new Semaphore(noOfThreads, noOfThreads)) {
    while (queue.Count > 0) {
      string fileName;
      fileName = queue.Dequeue();
      _semaphore.WaitOne();
      new Processor(ProcessFile).BeginInvoke(fileName, null, null);
    }
    for (int i=0; i<noOfThreads; i++) _semaphore.WaitOne();
  }
}
private void ProcessFile(string file) {
  Process p;
  using (p = new Process()) {
    p.StartInfo.FileName = @"binary.exe";
    p.StartInfo.Arguments = file;
    p.StartInfo.UseShellExecute = false;
    p.StartInfo.CreateNoWindow = true;
    p.StartInfo.RedirectStandardOutput = true;
    p.Start();
    p.WaitForExit();
  }
  _semaphore.Release();
}

надеюсь, это поможет

...