Потоки, работающие внутри дочернего процесса, не истекают в соответствии с установленным временем будильника - PullRequest
1 голос
/ 30 мая 2019

Этот вопрос является расширением моего предыдущего вопроса .

Единственное добавление здесь заключается в том, что теперь я бы хотел, чтобы потоки прерывали работу, если операция не завершена в течение установленного времени ожидания..

Итак, пройдя через perldoc и некоторые примеры, я набросал рабочую версию, и она работает - но что-то отключено.

Время ожидания точно не наступает в установленной продолжительностино иногда это занимает вдвое больше времени.

Я не могу исследовать это, и мне нужна ваша помощь в том же, как исследовать это дальше.

В этой программе я установилзначение тайм-аута до 3 секунд с помощью функции будильника и установлен обработчик сигнала в функции потока.

Мои наблюдения во время нескольких прогонов следующие:

  1. Тайм-аут никогда не происходит ниже установленногозначение.
  2. время ожидания всегда выше установленного значения, и его значение никогда не бывает точно установленным значением.

Пожалуйста,найти вывод программы и саму программу для вашего обзора.Ваши комментарии и отзывы приветствуются.

Выход программы:

(20:51:59) $:little-stuff>  perl 10-20190530-batch-processing-using-threads-with-busy-pool-detection-2-with-threads-timeout.pl 12
29872: STARTING TASKS IN BATCHES
29872: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
29872: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
29872: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
29878: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.27) seconds
29878: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (2.12) seconds
29876: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (2.97) seconds
29878: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (2.00) seconds.
29872: REAPER: TASK_ORCHESTRATOR pid (29878) has finished with status (0).
29877: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (3.65) seconds
29877: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (4.62) seconds
29872: MAIN: engine (e3) is RUNNING batch #4 tasks: (8 9)
29876: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (6.43) seconds
29876: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (6.00) seconds.
29872: REAPER: TASK_ORCHESTRATOR pid (29876) has finished with status (0).
29872: MAIN: engine (e1) is RUNNING batch #5 tasks: (10 11)
29877: TASK_RUNNER: engine (e2), batch #2 task #5 has TIMED OUT in (6.00) seconds !!
29877: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (6.00) seconds.
29872: REAPER: TASK_ORCHESTRATOR pid (29877) has finished with status (0).
29872: MAIN: engine (e2) is RUNNING batch #6 tasks: (12)
30000: TASK_RUNNER: engine (e3) finished batch #4 task #8 in (5.28) seconds
30059: TASK_RUNNER: engine (e1) finished batch #5 task #10 in (3.02) seconds
30059: TASK_RUNNER: engine (e1) finished batch #5 task #11 in (3.58) seconds
30059: TASK_ORCHESTRATOR: engine (e1) finished batch #5 tasks in (3.00) seconds.
29872: REAPER: TASK_ORCHESTRATOR pid (30059) has finished with status (0).
30000: TASK_RUNNER: engine (e3), batch #4 task #9 has TIMED OUT in (6.00) seconds !!
30000: TASK_ORCHESTRATOR: engine (e3) finished batch #4 tasks in (6.00) seconds.
29872: REAPER: TASK_ORCHESTRATOR pid (30000) has finished with status (0).
30129: TASK_RUNNER: engine (e2) finished batch #6 task #12 in (3.95) seconds
30129: TASK_ORCHESTRATOR: engine (e2) finished batch #6 tasks in (3.00) seconds.
29872: REAPER: TASK_ORCHESTRATOR pid (30129) has finished with status (0).
29872: ALL ORCHESTRATORS HAVE FINISHED
29872: FINISHED TASKS IN BATCHES

Программа:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);

STDOUT->autoflush(1);

my $timeout = 3;
my @total_tasks = (1 .. shift || 10);
my $sleep_time = 6; 
my @engines = (qw/e1 e2 e3/);
my $sizes = { e1 => 2, e2 => 3, e3 => 2, };

my $proc_hash;
my $global_string = "engine";

### 
# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
  sub REAPER {
    local ($!, $?);
    while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
      if ( WIFEXITED($?) )
      {
        my $ret_code = WEXITSTATUS($?);
        print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
        my $engine_name = $proc_hash->{$reaped_pid};
        delete ($proc_hash->{$reaped_pid});
        delete ($proc_hash->{$engine_name});
      }
    }
  }
#
###

$SIG{CHLD} = \&REAPER;

sub random_sleep_time { return sprintf ("%.2f",(rand ($sleep_time) + 1)) }

sub task_runner {
  my @args = @_;

  STDOUT->autoflush(1);
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  my ($batch, $engine) = split (/_/,$batch_engine);

  my $start_time = time;
  #my $end_time = undef;
  #my $tot_time = undef;

  $SIG{ALRM} = sub {
    my $end_time = time;
    my $tot_time = sprintf ("%.2f",($end_time - $start_time));
    print "$$: TASK_RUNNER: $global_string ($engine), batch #$batch".
          " task #$task has TIMED OUT in ($tot_time) seconds !!\n";
    threads->exit(0);
  };

  my $task_time = random_sleep_time();
  sleep ($task_time);
  #sleep (random_sleep_time());
  #$end_time = time;
  #$tot_time = sprintf ("%.2f",($end_time - $start_time));
  #print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($tot_time) seconds\n";
  print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
  threads->exit(0);
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    my @tids;
    alarm ($timeout);
    STDOUT->autoflush(1);
    my $start_time = time;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) {
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }

    $SIG{ALRM} = sub { 
      foreach my $t (@tids) {
        if ($t->is_running()) { $t->kill('ALRM') } 
      }
    };

    foreach my $tid (@tids) {$tid->join()}
    my $end_time = time;
    my $tot_time = sprintf ("%.2f",($end_time - $start_time));
    print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($tot_time) seconds.\n";
    exit (0);
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
{
  foreach my $engine (@engines)
  {
    if (exists $proc_hash->{$engine})
    {
      sleep (1);
      next;
    }
    else
    {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0)
      {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks)
      {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
        task_orchestrator ("$batch_engine",@engine_tasks);
        $batch++;
      }
    }
  }
}

# All Work fine
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);
# All Work fine

print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__

1 Ответ

2 голосов
/ 30 мая 2019

Время срабатывания будильника () не гарантируется с точностью до секунды. Стандартный модуль Time :: HiRes предлагает ualarm () и повторную реализацию alarm () в терминах ualarm (), которые могут быть ближе к тому, что вы хотите. Решение вашей проблемы может быть так же просто, как добавление

use Time::HiRes qw(alarm);

в начало вашей программы.

...