перекрывающийся вывод при чтении с fifo: как это исправить / избежать? - PullRequest
3 голосов
/ 25 апреля 2019

Я пытаюсь объединить данные из 2 файлов, поэтому я решил отправить данные через отдельные процессы записи в именованный fifo и запустил отдельный процесс чтения для чтения и обработки агрегированных данных.Все чтение / запись происходит на виртуальном диске (/ dev / shm), который обычно имеет большой размер около 100 гигабайт.

Этот рабочий файл, и я гарантировал, что каждая из строк данных, записываемых в fifo, составляет менее 512 байтовпоэтому труба может сохранять свое атомарное поведение.

Но после опробования нескольких прогонов я пришел к выводу, что процесс чтения получает перекрывающийся вывод, и это начинает происходить, когда я пытаюсь передать более 10 миллионов строк из каждогопроцесс.Каждая из моих строк данных заканчивается новой строкой.

Я открываю fifo в режиме «+ > fifo» для записи.Здесь не используются системные вызовы, просто используется обычное открывание для получения дескриптора файла и попытки построчно обрабатывать данные.

Как я могу начать исследовать это.Есть идеи?

Большое спасибо.

обновление как на 2019 / APR / 29 :

Обратите внимание, что мои циклы теперь используют системные вызовы.Раньше я ими не пользовался, но в конце концов решил их использовать.

Этого же можно добиться, если 2 процесса записывают в один файл, но нужно соблюдать осторожность, так как это будет работать только с POSIX-совместимым файлом.систем ИЛИ, если у кого-то его нет - можно сохранить файл журнала (где несколько процессов будут выполнять запись) в RAMDISK, так как он также работает.Диски NFS не входят в сферу применения, поскольку они не совместимы с POSIX, и этот метод на них не работает.

Так что, если мы говорим о FIFO против текстового файла - несколько процессов чтения / записи в файл быстрее, чем несколько процессов чтения / записи в FIFO.

Просто для будущих читателей, вотмой код процесса писатель и читатель.Как вы разрабатываете свой код для включения этих подпрограмм, зависит от вас.Множество способов сделать это.

Надеюсь, что это было полезно.

Процесс записи

  write_log => sub {
    my ($filehandle, $log_message) = @_;
    select $filehandle ; $|++;
    syswrite ($filehandle, $log_message, length($log_message))
      or die "write_log: syswrite fail!\n";
  },

Процесс чтения:

  read_log => sub
  {
    # In my endless reading loop,
    # if I detect keyword END 2 times (as 
    # i have 2 processes), I exit the reading loop
    # and do further operations.
    #
    my ($end_check_value) = @_;

    sysopen (FH,$logfile, O_CREAT|O_RDONLY)
      or die "($$) read_log: Failed to sysopen\n";

    my ($h, $end) = (undef,0);

    select FH ; $|++ ;

    print STDOUT get_ts().'|'."($$) read_log: now tailing logfile with check count $end_check_value\n";

    for (;;)
    {
      while (my $line = <FH>)
      {
        chomp $line;
        $end++ if $line =~ m/END/g;
        last if $end == $end_check_value;
        my $key = (split(/\s/,$line))[0];
        $h->{$key}++;
      }

      sleep(1) ; seek (FH,0,1);

      # break out of for loop if we
      # have collected the 'END' tags
      # from all worker processes
      if ($end == $end_check_value)
      {
        print STDOUT get_ts().'|'."($$) read_log: breaking for loop ",
                     "with end_check: $end_check_value\n";
        last;
      }
    } close (FH);
  },

Статистика производительности:

Вот статистика производительности для нескольких процессов, записывающих в один файл в RAMDISK.На avrage требуется около 10 минут плюс минус 20 секунд, чтобы записать 150 000 000 строк (150 млн) и затем прочитать их в хеш.

test string is 238 bytes long
20190429-12:34:50.637|(11139) PARENT: each child will write (75000000) to (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11139) trunc_log_file: truncated (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11149) process no. (2) launched!
20190429-12:34:54.399|(11150) process no. (1) launched!
20190429-12:34:55.400|(11139) read_log: now tailing logfile with check count 2
20190429-12:44:21.565|(11150) process exiting with status code 0
20190429-12:44:34.164|(11149) process exiting with status code 0
20190429-12:45:03.956|(11139) read_log: breaking for loop with end_check: 2
20190429-12:45:03.957|(11139) read_log: Collected counts:
(11139) (11149):75000000
(11139) (11150):75000000
---------------
(11139) Finished!

real    **10m13.466s**
user    9m31.627s
sys     0m39.650s

Вот статистика производительности для FIFO, где несколько процессов записывают по 25 000 000 строк в каждуюFIFO и процесс чтения читают их обратно в хеш.В среднем это заняло около 25-30 минут.Это медленнее, чем процессы записи в файл.

test string is 141 bytes long
20190426-10:25:13.455|28342|2-test-fifo.pl: Starting..
20190426-10:25:13.456|28345|CHILD starting (read_and_hash)
20190426-10:25:13.456|28345|READ_AND_HASH now hashing files
20190426-10:25:14.458|28346|CHILD starting (s1_data_gather)
20190426-10:25:14.458|28346|Working on sit1 data..
20190426-10:25:14.458|28347|CHILD starting (s2_data_gather)
20190426-10:25:14.458|28347|Working on sit2 data..
20190426-10:48:48.454|28346|Finished working on S1 data..
20190426-10:48:48.457|28342|Reaped 28346
20190426-10:48:48.462|28345|read LAST line from S2 data
20190426-10:48:52.657|28347|Finished working on s2 data..
20190426-10:48:52.660|28342|Reaped 28347
20190426-10:48:52.669|28345|read LAST line from S2 data
20190426-10:48:53.130|28345|READ_AND_HASH finished hashing files
(read_n_hash): finished hashing. keys count
        s1 = 25000000
        s2 = 25000000
20190426-10:48:53.130|28345|starting comparison. doing source to target
20190426-10:49:49.566|28345|finished comparing source to target. now comparing target to source
20190426-10:50:45.578|28345|comparing target to source ends. finished
20190426-10:51:57.220|28342|Reaped 28345
20190426-10:51:57.220|28342|2-test-fifo.pl: Ending..

Ответы [ 2 ]

1 голос
/ 25 апреля 2019

Возможно, вам придется включить автозапуск для файла, в который вы пишете. Если вы открываете файлы с помощью функции open (), а не через интерфейс OO, такой как IO :: File, то после того, как вы успешно откроете файл (скажем, в $ fifo), вам понадобится такой код.

select $fifo;
$| = 1;

Обратите внимание, что select () выбирает выходной дескриптор файла для распечаток и тому подобное, которые не указывают конкретный файловый дескриптор. Если вы хотите вернуться к цели STDOUT, то select STDOUT после вышеупомянутого или педантичный:

my $oldfh = select $fifo;
$| = 1;
select $oldfh;

Я не думаю, что файловые режимы ('+ <' и т. Д.) Имеют к этому какое-либо отношение, поскольку такие понятия, как "клоббер" и "добавление", не применяются к FIFO. Вы, вероятно, сделали бы то же самое с простыми ">" и "<". </p>

0 голосов
/ 25 апреля 2019

Возможно, что то, что вы видите здесь, является простым продуктом параллелизма.Вы предполагаете, что читатель извлекает данные из FIFO своевременно.Что, если оба автора имеют возможность написать несколько записей, прежде чем читатель получит еще один шанс на чтение?Что если FIFO достигнет максимальной производительности в процессе записи?Автор будет частично блокировать запись, и тогда у читателя будет возможность очистить очередь, но нет гарантии, что автор, который написал частичную строку, будет следующим для записи.Это может привести к появлению чередующихся строк.

Если мой ответ относительно автоматической очистки не решит вашу проблему, вам, возможно, придется рассмотреть возможность чередования записей таким образом.

Как упоминалось вкомментарий выше, вам может быть лучше использовать дейтаграммные сокеты (SOCK_DGRAM), а не FIFO.Таким образом, каждое сообщение представляет собой атомную единицу без возможности чередования.

...