Создание нескольких процессов с помощью PHP для обработки данных. - PullRequest
4 голосов
/ 07 января 2010

У меня есть очередь (Amazon SQS) данных, которую необходимо обработать, и я хотел бы сделать это с несколькими процессами (в PHP).

Я хочу, чтобы работающие дети делали что-то вроде этого (псевдоиш код):



while(true) {

    $array = $queue->fetchNItems(10); // get 10 items

    if(!count($array)) 
        killProcess();

    foreach($array as $item) {
         ... // process the item
         $queue->remove($item);
    }

    sleep(2);
}


Мне всегда нужен 1 дочерний процесс, но в случае необходимости я хочу (разветвлять?) Дочерний процесс, чтобы он мог помочь обработать очередь быстрее.

Может кто-нибудь помочь мне с грубым PHP-скелетом того, что мне нужно, или указать мне правильное направление?

Я думаю, мне нужно взглянуть на http://php.net/manual/en/function.pcntl-fork.php,, но я не уверен, как я могу использовать это для управления несколькими процессами.

Ответы [ 2 ]

2 голосов
/ 07 января 2010

Когда вы разворачиваете процесс. вы делаете копию этого процесса. Другими словами, копия (форк) содержит все, что имел оригинальный процесс (включая файловые дескрипторы)

Так откуда вы знаете, являетесь ли вы родителем или разветвленным процессом?

Пример со связанной страницы показывает это довольно четко

<?php

$pid = pcntl_fork();
if ($pid == -1) {
     die('could not fork');
} else if ($pid) {
     // we are the parent
     pcntl_wait($status); //Protect against Zombie children
} else {
     // we are the child
}

?>

Чтобы расширить это до того, что вы хотите

<?php

$pid = pcntl_fork();
if ($pid == -1) {
     die('could not fork');
} else if ($pid) {
     // we are the parent
     pcntl_wait($status); //Protect against Zombie children
} else {
     // we are the child
     while(true) {

         $array = $queue->fetchNItems(10); // get 10 items

         if(!count($array)) {
            exit();
         }

         foreach($array as $item) {
              ... // process the item
              $queue->remove($item);
         }

         sleep(2);
     }
}

?>

Это создаст для разветвленного процесса (в данном случае - отходов) использование цикла для создания нескольких процессов. когда дочерний процесс завершил выход, он завершит дочерний процесс. и pcntl_wait () вернется, что позволит родителю продолжить. Я не уверен насчет php, но если родительский процесс умирает или завершается, он убивает дочерний процесс, даже если дочерний процесс не завершен. отсюда pcntl_wait. если вы породили нескольких детей, требуется более сложная система.

возможно, вместо того, чтобы разветвляться, вам следует взглянуть на диапазон исполняемых функций?

Предостережение.

процесс разветвления может быть вызван проблемами, дескрипторы базы данных закрываются при выходе из дочернего процесса и т. Д. Вы также можете убить сервер для многих процессов, если что-то пойдет не так. тратить много времени на игры, тестирование и чтение.

DC

0 голосов
/ 28 сентября 2015

Я знаю, что это старая ветка, но выглядело так, как будто она может использовать более полный ответ. Так я обычно создаю несколько процессов в PHP.

Предостережение: PHP должен был умереть. То есть, язык должен был исполниться в течение нескольких секунд, а затем завершиться. Хотя очистка мусора в PHP прошла долгий путь, будьте осторожны. Контролируйте свои процессы на предмет неожиданного потребления памяти или других странностей. Наблюдайте за всем, как ястреб, какое-то время, прежде чем установить его и забыть об этом, и даже тогда периодически проверяйте процессы или заставляйте их автоматически уведомлять, если что-то становится не так.

Когда я набирал это, мне показалось, что хорошей идеей было поставить его на github .

Когда все будет готово для запуска программы, я рекомендую выполнить tail -f в журнале, чтобы увидеть вывод.

<?php
/*
 * date: 27-sep-2015
 * auth: robert smith
 * info: run a php daemon process
 * lic : MIT License (see LICENSE.txt for details)
 */    
$pwd = realpath("");

$daemon = array(
  "log"      => $pwd."/service.log",
  "errorLog" => $pwd."/service.error.log",
  "pid_file" => $pwd."/",
  "pid"      => "",
  "stdout"   => NULL,
  "stderr"   => NULL,
  "callback" => array("myProcessA", "myProcessB")
  );

/*
 * main (spawn new process)
 */
foreach ($daemon["callback"] as $k => &$v)
  {
  $pid = pcntl_fork();

  if ($pid < 0)
    exit("fork failed: unable to fork\n");

  if ($pid == 0)
    spawnChores($daemon, $v);
  }

exit("fork succeeded, spawning process\n");
/*
 * end main
 */

/*
 * functions
 */
function spawnChores(&$daemon, &$callback)
  {
  // become own session
  $sid = posix_setsid();

  if ($sid < 0)
    exit("fork failed: unable to become a session leader\n");

  // set working directory as root (so files & dirs are not locked because of process)
  chdir("/");

  // close open parent file descriptors system STDIN, STDOUT, STDERR
  fclose(STDIN);
  fclose(STDOUT);
  fclose(STDERR);

  // setup custom file descriptors
  $daemon["stdout"] = fopen($daemon["log"], "ab");
  $daemon["stderr"] = fopen($daemon["errorLog"], "ab");

  // publish pid
  $daemon["pid"] = sprintf("%d", getmypid());
  file_put_contents($daemon["pid_file"].$callback.".pid", $daemon["pid"]."\n");

  // publish start message to log
  fprintf($daemon["stdout"], "%s daemon %s started with pid %s\n", date("Y-M-d H:i:s"), $callback, $daemon["pid"]);

  call_user_func($callback, $daemon);

  // publish finish message to log
  fprintf($daemon["stdout"], "%s daemon %s terminated with pid %s\n", date("Y-M-d H:i:s"), $callback, $daemon["pid"]);

  exit(0);
  }

function myProcessA(&$daemon)
  {
  $run_for_seconds = 30;
  for($i=0; $i<$run_for_seconds; $i++)
    {
    fprintf($daemon["stdout"], "Just being a process, %s, for %d more seconds\n", __FUNCTION__, $run_for_seconds - $i);
    sleep(1);
    }
  }

function myProcessB(&$daemon)
  {
  $run_for_seconds = 30;
  for($i=0; $i<$run_for_seconds; $i++)
    {
    fprintf($daemon["stdout"], "Just being a process, %s, for %d / %d seconds\n", __FUNCTION__, $i, $run_for_seconds);
    sleep(1);
    }
  }
?>
...