Родитель не ждет завершения дочерних процессов, несмотря на пожатие - PullRequest
2 голосов
/ 27 мая 2019

Я полностью осознаю, что есть тонны статей, объясняющих внутреннюю работу динамики родительско-дочерних процессов.Я прошел через них и заставил свои вещи работать так, как я хочу, чтобы они почти работали.Но есть одна вещь, которая мешает мне, и я не могу понять это, несмотря на несколько попыток.

Проблема: Несмотря на то, что пожинает детей, main не ждет, пока все дети закончат работуи выходит преждевременно.Я считаю, что сделал правильный выход из дочернего процесса, и я установил REAPER в дочерний процесс - так как же происходит основной выход до завершения дочернего процесса?

Не ищите решения здесь - но мне нужно новое направление, где я мог бы биться в течение следующей недели.На данный момент - я чувствую, что исчерпал свои возможности и перепробовал много вещей, но безрезультатно.

Некоторые сведения о том, чего я пытаюсь достичь:

В общем, я хочу, чтобы все дети закончили, и только тогда я хочу продолжать что-то делать дальше.Каждый дочерний процесс порождает кучу потоков, и эти потоки должным образом соединяются с указанным дочерним процессом, который затем выполняет выход с помощью exit(0).

Дополнительной шумихой, которую вы можете наблюдать в программе, является не что иное, как нашатребование, когда мы должны использовать 5 API (движков), но только с фиксированным размером пакета, скажем, 10 для каждого, за один раз.Я запускаю дочерний процесс для каждого механизма и запускаю поток для каждого запроса, а затем жду завершения всех потоков, присоединяюсь к ним, и только после этого завершается дочерний процесс.Только теперь я мог отправить следующую партию запросов на один и тот же двигатель, и я делаю это для всех двигателей, пока не исчерпаю свои общие запросы, скажем, 10000.

Каждый запрос может занять от 1 секунды до 2 часов -в основном они представляют собой отчеты CSV, извлекаемые из HTTP API.

Моя проблема заключается в том, что, когда я исчерпал свой общий набор запросов - я не могу ждать, заставив ОСНОВНУЮ подождать, пока вседочерние процессы завершены.Это странная проблема, которую я пытаюсь решить.

Любые идеи?

Моя программа ВЫХОД:

[compuser@lenovoe470:little-stuff]$  perl 07--20190526-batch-processing-using-threads-with-busy-pool-detection-2.pl 12
26710: STARTING TASKS IN BATCHES
26710: RUNNING batch_engine 1_e1 tasks (1 2)
26710: RUNNING batch_engine 2_e2 tasks (3 4)
26710: RUNNING batch_engine 3_e3 tasks (5 6 7)
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710: BUSY_ENGINE: e3.
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710:26712: TASK_ORCHESTRATOR: >> finished batch_engine (2_e2) tasks (3 4)
26710: PID (26712) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e3.
26710:26713: TASK_ORCHESTRATOR: >> finished batch_engine (3_e3) tasks (5 6 7)
26710:26711: TASK_ORCHESTRATOR: >> finished batch_engine (1_e1) tasks (1 2)
26710: PID (26713) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e1.
26710: PID (26711) has finished with status (0). updating proc hash
26710: RUNNING batch_engine 4_e2 tasks (8 9)
26710: RUNNING batch_engine 5_e3 tasks (10 11 12)
26710: FINISHED TASKS IN BATCHES
[compuser@lenovoe470:little-stuff]$  1:26722: TASK_ORCHESTRATOR: >> finished batch_engine (5_e3) tasks (10 11 12)
1:26721: TASK_ORCHESTRATOR: >> finished batch_engine (4_e2) tasks (8 9)

В приведенном выше выводе:

  • Запуск batch_engine означает, что я запускаю пакет нумерованных задач.
  • BUSY_ENGINE означает, что конечная точка / механизм занят, поскольку он уже занят обработкоймаксимальный размер пакета запросов.Мне нужно подождать.
  • Законченный batch_engine означает, что дочерний процесс завершил обработку заданного пакета запросов для определенного механизма / конечной точки.Он выходит, и main обнаруживает, что текущий движок теперь свободен, и следующая партия может быть поставлена ​​в очередь
  • , если мы увидим последние 2 строки, очевидно, что выходные данные дочерних процессов переполнены, и основной выход преждевременно безв ожидании бегущих детей.Зачем?любая помощь?

Моя программа:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);


STDOUT->autoflush(1);


# doesn't work
  sub reaper {
    my $reaped;
    while (($reaped = waitpid (-1,&WNOHANG) > 0)) {
      print "$$: reaped: $reaped\n";
      sleep(1);
    }
    $SIG{CHLD} = \&reaper;
  }
# doesn't work


my @total_tasks = (1 .. shift || 9);
my @engines = (qw/e1 e2 e3/);
my $sizes = { e1 => 2, e2 => 2, e3 => 3, };

my $proc_hash;
my $global_string = "ENGINE";

# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
  sub REAPER {
    local ($!, $?);
    while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
      if ( WIFEXITED($?) ) 
      {
        # my
        my $ret_code = WEXITSTATUS($?);
        print "$$: PID ($reaped_pid) has finished with status ($ret_code). updating proc hash\n";
        my $engine_name = $proc_hash->{$reaped_pid};
        delete ($proc_hash->{$reaped_pid});
        delete ($proc_hash->{$engine_name});
        # my

        # original
        #my $ret_code = WEXITSTATUS($?);
        #print "child process:$pid exit with code:$ret_code\n";
        # original
      }
    }
  }
#

$SIG{CHLD} = \&REAPER;

sub random_sleep_time {
  return (int(rand(5)+1))
  #return (sprintf "%.2f",(rand(1)+1))
}

sub task_runner {
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  STDOUT->autoflush(1);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  threads->exit(0);
  #print "$$:".(threads->tid()).": TASK_RUNNER: $global_string ($batch_engine) task ($task) finished in $task_time seconds\n";
  #return;
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my $engine = (split (/_/,$batch_engine))[1];
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) { 
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    my $ppid = getppid();
    foreach my $tid (@tids) {$tid->join()}
    print "$ppid:$$: TASK_ORCHESTRATOR: >> finished batch_engine ($batch_engine) tasks (@tasks)\n";
    exit (0);
  }
}

sub update_proc_hash {
  my $finished_pid = waitpid (-1, POSIX->WNOHANG);
  if ($finished_pid > 0) {
    print "$$: PID ($finished_pid) has finished. updating proc hash\n";
    my $engine_name = $proc_hash->{$finished_pid};
    delete ($proc_hash->{$finished_pid});
    delete ($proc_hash->{$engine_name});
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks) {
  foreach my $engine (@engines) {
    update_proc_hash();
    if (exists $proc_hash->{$engine}) {
      print "$$: BUSY_ENGINE: $engine.\n";
      sleep (1);
      next;
    }
    else {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0) {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks) {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: RUNNING batch_engine $batch_engine tasks (@engine_tasks)\n";
        task_orchestrator ("$batch_engine",@engine_tasks);
        $batch++;
      }
    }
  }
}

REAPER();

print "$$: FINISHED TASKS IN BATCHES\n";

__END__

Обновление через 3 дня: Спасибо СО сообществу.Еще раз, я благодарен всем вам, кто потратил свое время, чтобы разобраться в этом и помог выявить и исправить проблему.Огромное спасибо.

Позвольте мне поделиться новым выводом с финальной программой для всеобщего ознакомления.

ВЫХОД после использования исправления:

User@Host:/cygdrive/c/bash-home> perl test.pl
22044: STARTING TASKS IN BATCHES
22044: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
22044: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
22044: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
41456: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (1.80) seconds
41456: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (1.31) seconds
41456: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (41456) has finished with status (0).
18252: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (1.04) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (1.91) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #5 in (1.63) seconds
18252: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (18252) has finished with status (0).
14544: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (1.42) seconds
14544: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.84) seconds
14544: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (14544) has finished with status (0).
22044: MAIN: engine (e1) is RUNNING batch #4 tasks: (8 9)
22044: MAIN: engine (e2) is RUNNING batch #5 tasks: (10)
37612: TASK_RUNNER: engine (e1) finished batch #4 task #8 in (1.19) seconds
37612: TASK_RUNNER: engine (e1) finished batch #4 task #9 in (1.31) seconds
37612: TASK_ORCHESTRATOR: engine (e1) finished batch #4 tasks in (1.00) seconds.
16300: TASK_RUNNER: engine (e2) finished batch #5 task #10 in (1.53) seconds
16300: TASK_ORCHESTRATOR: engine (e2) finished batch #5 tasks in (1.00) seconds.
22044: ALL ORCHESTRATORS HAVE FINISHED
22044: FINISHED TASKS IN BATCHES

ЗАКЛЮЧИТЕЛЬНАЯ РАБОЧАЯ ПРОГРАММА:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use threads;

STDOUT->autoflush(1);

my @total_tasks = (1 .. 10);
my $sleep_time = 1;
my @engines = (qw/e1 e2 e3/);
my $sizes = {
  e1 => 2,
  e2 => 3,
  e3 => 2,
};

my $proc_hash;
my $global_string = "engine";

sub REAPER {
  local ($!, $?);
  while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
    if ( WIFEXITED($?) ) {
      my $ret_code = WEXITSTATUS($?);
      print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
      my $engine_name = $proc_hash->{$reaped_pid};
      delete ($proc_hash->{$reaped_pid});
      delete ($proc_hash->{$engine_name});
    }
  }
}

$SIG{CHLD} = \&REAPER;

sub random_sleep_time { return sprintf ("%.2f",(rand ($sleep_time||5) + 1)) }

sub task_runner {
  STDOUT->autoflush(1);
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
  threads->exit(0);
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    my $start_time = time;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) {
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    foreach my $tid (@tids) {$tid->join()}
    my $end_time = time;
    my $total_time = sprintf ("%.2f",($end_time - $start_time));
    print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($total_time) seconds.\n";
    exit (0);
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
{
  foreach my $engine (@engines)
  {
    if (exists $proc_hash->{$engine})
    {
      sleep (1);
      next;
    }
    else
    {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0)
      {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks)
      {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
        task_orchestrator ($batch_engine,@engine_tasks);
        $batch++;
      }
    }
  }
}

# All 3 below work properly
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);

print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__

Ответы [ 2 ]

3 голосов
/ 28 мая 2019

waitpid

может вернуть 0, если есть дочерние процессы, соответствующие PID, но ни один еще не завершен

и -1 это относится к любому дочернему процессу. Таким образом, ваша неблокирующая waitpid in REAPER завершает этот цикл while, как только приходит нулевой возврат, что происходит в вашем коде с несколькими детьми. Это возвращение 0 - это то, что позволяет нам ждать, пока есть незавершенные дочерние процессы, именно то, что вы хотите.

Один из способов - опросить неотрицательные доходы

use warnings;
use strict;
use feature 'say';

use POSIX ':sys_wait_h';
use Time::HiRes qw(sleep) ;

for (1..4) { 
    my $pid = fork // die "Can't fork: $!";
    if ($pid == 0) { 
        sleep rand 4;  
        say "\tkid $$ exiting"; 
        exit;
    };  
}; 

while ( (my $kid = waitpid -1, WNOHANG) > -1 ) { 
    say "got $kid" if $kid > 0;
    sleep 0.2;
}

печать

        kid 12687 exiting
got 12687
        kid 12690 exiting
got 12690
        kid 12689 exiting
got 12689
        kid 12688 exiting
got 12688

Пожалуйста, настройте период опроса соответствующим образом. Обратите внимание, что, так как он перехватывает любые дочерние процессы, он может вмешиваться в другие вилки, если к этому моменту были ожидаемые.

Или вы можете заблокировать с ожиданием

while ( (my $kid = waitpid -1, 0) > -1 ) { 
    say "got $kid";
}

, где вы теперь можете также сделать > 0, поскольку здесь не будет возврата 0, так как вызов блокирует. В то время как нам нужно только завершить цикл, как только -1 возвращается (больше никаких процессов), как и раньше.

Основное отличие состоит в том, что блок выполняется только после фактического завершения дочернего процесса, поэтому если вам нужно следить за тем, что делают некоторые долгосрочные дети (и, возможно, ограничить время их выполнения или защитить от зависших заданий), то есть не так просто в этой форме; для этого вам нужна неблокирующая операция.

Обратите внимание, что некоторые детали, в частности касающиеся возвратов, могут различаться в разных системах.


Наивной версией этого является ожидание только этих конкретных PID, собранных как вы fork

foreach my $pid (@pids) {
    my $gone = waitpid $pid, 0;
    say "Process $gone exited with $?" if $gone > 0;  # -1 if reaped already
}

, который блокируется с waitpid для каждого процесса. Проблема в том, что если один процесс выполняется намного дольше других (или зависает), этот цикл застревает в ожидании. И вообще, мы бы предпочли, чтобы дочерние процессы были запущены при выходе, а не в том порядке, в котором они были запущены.

1 голос
/ 27 мая 2019

При выходе из основного цикла вы вызываете REAPER (), который выполняет неблокирующую функцию waitpid (). Неблокируемая. Номера. И это не блокирует. Так что это выходит.

Пока я здесь, я отмечаю, что ваша функция update_proc_hash () не зацикливается, как другие функции, которые делают waitpid (), поэтому она не перехватывает все, что может. Сделайте себе одолжение и аккуратно разберитесь со всем этим.

...