Как разделить ввод сценария оболочки по времени, а не по размеру? - PullRequest
2 голосов
/ 26 мая 2019

В скрипте bash я использую шаблон одного производителя для многих производителей.Производители - это фоновые процессы, записывающие строки в fifo (через GNU Parallel).Потребитель читает все строки из fifo, затем сортирует, фильтрует и печатает отформатированный результат в стандартный вывод.

Однако может потребоваться много времени, чтобы получить полный результат.Производители обычно быстры в первых нескольких результатах, но затем замедляются.Здесь мне больше интересно видеть порции данных каждые несколько секунд, каждая из которых сортируется и фильтруется индивидуально.

mkfifo fifo
parallel ... >"$fifo" &
while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
  process "$chunk"
done

Цикл будет выполняться до тех пор, пока все производители не будут выполнены, и весь ввод не будет прочитан.Каждый блок читается до тех пор, пока не появятся новые данные в течение 5 секунд или пока не пройдет 10 секунд с момента запуска блока.Чанк также может быть пустым, если в течение 10 секунд не было новых данных.

Я пытался заставить его работать так:

output=$(mktemp)
while true; do
  wasTimeout=0 interruptAt=$(( $(date '+%s') + 10 ))
  while true; do
    IFS= read -r -t5 <>"${fifo}"
    rc="$?"
    if [[ "${rc}" -gt 0 ]]; then
      [[ "${rc}" -gt 128 ]] && wasTimeout=1
      break
    fi
    echo "$REPLY" >>"${output}"
    if [[ $(date '+%s') -ge "${interruptAt}" ]]; then
      wasTimeout=1
      break
    fi
  done
  echo '---' >>"${output}"
  [[ "${wasTimeout}" -eq 0 ]] && break
done

Попробовал некоторые варианты этого.В приведенной выше форме он читает первый фрагмент, но затем зацикливается навсегда.Если я использую <"${fifo}" (без чтения / записи, как указано выше), он блокируется после первого блока.Может быть, все это можно упростить с помощью buffer и / или stdbuf?Но оба они определяют блоки по размеру, а не по времени.

Ответы [ 3 ]

2 голосов
/ 04 июня 2019

Это не тривиальная проблема, которую нужно решить.Как я уже говорил, C-программа (или программа на каком-то другом языке программирования, кроме оболочки), вероятно, является лучшим решением.Вот некоторые из усложняющих факторов:

  • Чтение с тайм-аутами.
  • Если данные поступают достаточно быстро, время ожидания меняется.
  • Различные системы имеют разные наборы интервалов временифункции:
    • alarm(), вероятно, доступны везде, но имеют разрешение всего в 1 секунду, что может привести к накопленным ошибкам округления.(Скомпилируйте эту версию с make UFLAGS=-DUSE_ALARM; в macOS используйте make UFLAGS=-DUSE_ALARM LDLIB2=.)
    • setitimer() использует микросекундную синхронизацию и тип struct timeval.(Скомпилируйте эту версию с make UFLAGS=-DUSE_SETITIMER; на macOS скомпилируйте с make UFLAGS=-DUSE_SETITIMER LDLIB2=.)
    • timer_create() и timer_settime() и т. Д. Используйте современный тип наносекундыstruct timespec.Это доступно в Linux;это не доступно на macOS 10.14.5 Mojave или ранее.(Скомпилируйте эту версию с make; она не будет работать в macOS.)

Сообщение об использовании программы:

$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
  -c chunk  Maximum time to wait for data in a chunk (default 10)
  -d delay  Maximum delay after line read (default: 5)
  -f file   Read from file instead of standard input
  -h        Print this help message and exit
  -v        Verbose mode: print timing information to stderr
  -V        Print version information and exit

$

Этот коддоступно в моем репозитории SOQ (Вопросы о переполнении стека) на GitHub в виде файла chunker79.c в подкаталоге src / so-5631-4784 .Вам также понадобится некоторый код поддержки из каталога src / libsoq.

/*
@(#)File:           chunker79.c
@(#)Purpose:        Chunk Reader for SO 5631-4784
@(#)Author:         J Leffler
@(#)Copyright:      (C) JLSS 2019
*/

/*TABSTOP=4*/

/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel).  The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available.  Producers are usually fast on the first few results but
** then would slow down.  Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
**    mkfifo fifo
**    parallel ... >"$fifo" &
**    while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
**      process "$chunk"
**    done
**
** The loop would run until all producers are done and all input is
** read.  Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started.  A chunk may also
** be empty if there was no new data for 10s.
*/

/*
** Analysis
**
** 1.  If no data arrives at all for 10 seconds, then the program should
**     terminate producing no output.  This timeout is controlled by the
**     value of time_chunk in the code.
** 2.  If data arrives more or less consistently, then the collection
**     should continue for 10s and then finish.  This timeout is also
**     controlled by the value of time_chunk in the code.
** 3.  If a line of data arrives before 5 seconds have elapsed, and no
**     more arrives for 5 seconds, then the collection should finish.
**     (If the first line arrives after 5 seconds and no more arrives
**     for more than 5 seconds, then the 10 second timeout cuts in.)
**     This timeout is controlled by the value of time_delay in the code.
** 4.  This means that we want two separate timers at work:
**     - Chunk timer (started when the program starts).
**     - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored.  External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
**    but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
**    timer_gettime(2) uses current POSIX function but is not available
**    on macOS.
*/

#include "posixver.h"

#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>

#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */

static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
    "  -c chunk  Maximum time to wait for data in a chunk (default 10)\n"
    "  -d delay  Maximum delay after line read (default: 5)\n"
    "  -f file   Read from file instead of standard input\n"
    "  -h        Print this help message and exit\n"
    "  -v        Verbose mode: print timing information to stderr\n"
    "  -V        Print version information and exit\n"
    ;

static struct timespec time_delay = { .tv_sec =  5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;

static bool verbose = false;

static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);

// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);

#if defined(USE_ALARM)

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(time_chunk.tv_sec);
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    unsigned time_left = alarm(0);
    if (time_left > time_delay.tv_sec)
        alarm(time_delay.tv_sec);
    else
        alarm(time_left);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    alarm(0);
    signal(SIGALRM, SIG_IGN);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#elif defined(USE_SETITIMER)

static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
    return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
    tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_until;
    if (getitimer(ITIMER_REAL, &tv_until) != 0)
        err_syserr("failed to set interval timer: ");
    struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);

    if (verbose)
    {
        char buff1[32];
        fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
        char buff2[32];
        fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
        err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
    }

    if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
        tv_new.it_value = cvt_timespec_to_timeval(time_delay);
        struct itimerval tv_old;
        if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
            err_syserr("failed to set interval timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerval tv_new =
    {
        .it_value    = { .tv_sec = 0, .tv_usec = 0 },
        .it_interval = { .tv_sec = 0, .tv_usec = 0 },
    };
    struct itimerval tv_old;
    if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
        err_syserr("failed to set interval timer: ");
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

#else /* USE_TIMER_GETTIME */

#include "timespec_math.h"

static timer_t t0 = { 0 };

static void set_chunk_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);

    struct sigevent ev =
    {
        .sigev_notify = SIGEV_SIGNAL,
        .sigev_signo = SIGALRM,
        .sigev_value.sival_int = 0,
        .sigev_notify_function = 0,
        .sigev_notify_attributes = 0,
    };
    if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
        err_syserr("failed to create a timer: ");

    struct itimerspec it =
    {
        .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
        .it_value = time_chunk,
    };
    struct itimerspec ot;
    if (timer_settime(t0, 0, &it, &ot) != 0)
        err_syserr("failed to activate timer: ");

    struct sigaction sa;
    sa.sa_handler = alarm_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sigaction(SIGALRM, &sa, NULL);
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void set_delay_timeout(void)
{
    if (verbose)
        err_remark("-->> %s()\n", __func__);
    struct itimerspec time_until;
    if (timer_gettime(t0, &time_until) != 0)
        err_syserr("failed to set per-process timer: ");

    char buff1[32];
    fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
    char buff2[32];
    fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
    err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);

    if (cmp_timespec(time_until.it_value, time_delay) <= 0)
    {
        if (verbose)
            err_remark("---- %s(): no need for delay timer\n", __func__);
    }
    else
    {
        struct itimerspec time_new =
        {
            .it_interval = { .tv_sec = 0, .tv_nsec = 0 },
            .it_value = time_delay,
        };
        struct itimerspec time_old;
        if (timer_settime(t0, 0, &time_new, &time_old) != 0)
            err_syserr("failed to set per-process timer: ");
        if (verbose)
            err_remark("---- %s(): set delay timer\n", __func__);
    }
    if (verbose)
        err_remark("<<-- %s()\n", __func__);
}

static void cancel_timeout(void)
{
    if (timer_delete(t0) != 0)
        err_syserr("failed to delete timer: ");
}

#endif /* Timing mode */

/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
    assert(signum == SIGALRM);
    if (verbose)
        err_remark("---- %s(): signal %d\n", __func__, signum);
}

static void read_chunks(FILE *fp)
{
    size_t num_data = 0;
    size_t max_data = 0;
    struct iovec *data = 0;
    size_t buflen = 0;
    char *buffer = 0;
    ssize_t length;
    size_t chunk_len = 0;

    clock_gettime(CLOCK_REALTIME, &time_start);

    set_chunk_timeout();
    while ((length = getline(&buffer, &buflen, fp)) != -1)
    {
        if (num_data >= max_data)
        {
            size_t new_size = (num_data * 2) + 2;
            void *newspace = realloc(data, new_size * sizeof(data[0]));
            if (newspace == 0)
                err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
            data = newspace;
            max_data = new_size;
        }
        data[num_data].iov_base = buffer;
        data[num_data].iov_len = length;
        num_data++;
        if (verbose)
            err_remark("Received line %zu\n", num_data);
        chunk_len += length;
        buffer = 0;
        buflen = 0;
        set_delay_timeout();
    }
    cancel_timeout();

    if (chunk_len > 0)
    {
        if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
            err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
        else if ((size_t)length != chunk_len)
            err_error("failed to write %zu bytes to standard output "
                      "(short write of %zu bytes)\n", chunk_len, (size_t)length);
    }

    if (verbose)
        err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);

    for (size_t i = 0; i < num_data; i++)
        free(data[i].iov_base);
    free(data);
    free(buffer);
}

int main(int argc, char **argv)
{
    const char *name = "(standard input)";
    FILE *fp = stdin;
    err_setarg0(argv[0]);
    err_setlogopts(ERR_MICRO);

    int opt;
    while ((opt = getopt(argc, argv, optstr)) != -1)
    {
        switch (opt)
        {
        case 'c':
            if (scn_timespec(optarg, &time_chunk) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'd':
            if (scn_timespec(optarg, &time_delay) != 0)
                err_error("Failed to convert '%s' into a time value\n", optarg);
            break;
        case 'f':
            if ((fp = fopen(optarg, "r")) == 0)
                err_syserr("Failed to open file '%s' for reading: ", optarg);
            name = optarg;
            break;
        case 'h':
            err_help(usestr, hlpstr);
            /*NOTREACHED*/
        case 'v':
            verbose = true;
            break;
        case 'V':
            err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
            /*NOTREACHED*/
        default:
            err_usage(usestr);
            /*NOTREACHED*/
        }
    }

    if (optind != argc)
        err_usage(usestr);

    if (verbose)
    {
        err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
        err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
        err_remark("file:  %s\n", name);
    }

    read_chunks(fp);

    return 0;
}

В моем репозитории SOQ также есть скрипт gen-data.sh, который использует некоторые пользовательские программы для генерации потока данных, такого как этот(начальное значение записывается в стандартную ошибку, а не в стандартный вывод):

$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$

При подаче в chunker79 с параметрами по умолчанию, я получаю вывод как:

$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$

Если выпроанализируйте временные интервалы (посмотрите на первые два поля в выходных строках), чтобы выходные данные соответствовали спецификации.Еще более подробный анализ представлен следующим образом:

$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000]  -  5.026s  -  chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00]  -  7.838s  -  gen-data.sh
$

В этой настройке наблюдается заметная пауза между появлением выхода из chunker79 и завершением gen-data.sh.Это связано с тем, что Bash ожидает завершения всех процессов в конвейере, а gen-data.sh не завершается до следующей записи в канал после сообщения, заканчивающего chunker79.Это артефакт этой тестовой установки;это не будет фактором в сценарии оболочки, описанном в вопросе.

0 голосов
/ 27 мая 2019

Примерно так:

#!/usr/bin/perl

$timeout = 3;
while(<STDIN>) {
    # Make sure there is some input                                                      
    push @out,$_;
    eval {
        local $SIG{ALRM} = sub { die };
        alarm $timeout;
        while(<STDIN>) {
            alarm $timeout;
            push @out,$_;
        }
        alarm 0;
    };
    system "echo","process",@out;
}
0 голосов
/ 26 мая 2019

Я хотел бы написать безопасную многопоточную программу с очередями.

Я знаю Java лучше, но могут быть более современные подходящие языки, такие как Go и Kotlin.

...