Это не тривиальная проблема, которую нужно решить.Как я уже говорил, C-программа (или программа на каком-то другом языке программирования, кроме оболочки), вероятно, является лучшим решением.Вот некоторые из усложняющих факторов:
- Чтение с тайм-аутами.
- Если данные поступают достаточно быстро, время ожидания меняется.
- Различные системы имеют разные наборы интервалов временифункции:
alarm()
, вероятно, доступны везде, но имеют разрешение всего в 1 секунду, что может привести к накопленным ошибкам округления.(Скомпилируйте эту версию с make UFLAGS=-DUSE_ALARM
; в macOS используйте make UFLAGS=-DUSE_ALARM LDLIB2=
.) setitimer()
использует микросекундную синхронизацию и тип struct timeval
.(Скомпилируйте эту версию с make UFLAGS=-DUSE_SETITIMER
; на macOS скомпилируйте с make UFLAGS=-DUSE_SETITIMER LDLIB2=
.) timer_create()
и timer_settime()
и т. Д. Используйте современный тип наносекундыstruct timespec
.Это доступно в Linux;это не доступно на macOS 10.14.5 Mojave или ранее.(Скомпилируйте эту версию с make
; она не будет работать в macOS.)
Сообщение об использовании программы:
$ chunker79 -h
Usage: chunker79 [-hvV][-c chunk][-d delay][-f file]
-c chunk Maximum time to wait for data in a chunk (default 10)
-d delay Maximum delay after line read (default: 5)
-f file Read from file instead of standard input
-h Print this help message and exit
-v Verbose mode: print timing information to stderr
-V Print version information and exit
$
Этот коддоступно в моем репозитории SOQ (Вопросы о переполнении стека) на GitHub в виде файла chunker79.c
в подкаталоге src / so-5631-4784 .Вам также понадобится некоторый код поддержки из каталога src / libsoq.
/*
@(#)File: chunker79.c
@(#)Purpose: Chunk Reader for SO 5631-4784
@(#)Author: J Leffler
@(#)Copyright: (C) JLSS 2019
*/
/*TABSTOP=4*/
/*
** Problem specification from the Stack Overflow question
**
** In a bash script I am using a many-producer single-consumer pattern.
** Producers are background processes writing lines into a fifo (via GNU
** Parallel). The consumer reads all lines from the fifo, then sorts,
** filters, and prints the formatted result to stdout.
**
** However, it could take a long time until the full result is
** available. Producers are usually fast on the first few results but
** then would slow down. Here I am more interested to see chunks of
** data every few seconds, each sorted and filtered individually.
**
** mkfifo fifo
** parallel ... >"$fifo" &
** while chunk=$(read with timeout 5s and at most 10s <"$fifo"); do
** process "$chunk"
** done
**
** The loop would run until all producers are done and all input is
** read. Each chunk is read until there has been no new data for 5s, or
** until 10s have passed since the chunk was started. A chunk may also
** be empty if there was no new data for 10s.
*/
/*
** Analysis
**
** 1. If no data arrives at all for 10 seconds, then the program should
** terminate producing no output. This timeout is controlled by the
** value of time_chunk in the code.
** 2. If data arrives more or less consistently, then the collection
** should continue for 10s and then finish. This timeout is also
** controlled by the value of time_chunk in the code.
** 3. If a line of data arrives before 5 seconds have elapsed, and no
** more arrives for 5 seconds, then the collection should finish.
** (If the first line arrives after 5 seconds and no more arrives
** for more than 5 seconds, then the 10 second timeout cuts in.)
** This timeout is controlled by the value of time_delay in the code.
** 4. This means that we want two separate timers at work:
** - Chunk timer (started when the program starts).
** - Delay timer (started each time a line is read).
**
** It doesn't matter which timer goes off, but further timer signals
** should be ignored. External signals will confuse things; tough!
**
** -- Using alarm(2) is tricky because it provides only one time, not two.
** -- Using getitimer(2), setitimer(2) uses obsolescent POSIX functions,
** but these are available on macOS.
** -- Using timer_create(2), timer_destroy(2), timer_settime(2),
** timer_gettime(2) uses current POSIX function but is not available
** on macOS.
*/
#include "posixver.h"
#include "stderr.h"
#include "timespec_io.h"
#include <assert.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/uio.h>
#include <time.h>
#include <unistd.h>
#ifdef USE_SETITIMER
#include "timeval_math.h"
#include "timeval_io.h"
#include <sys/time.h>
#endif /* USE_SETITIMER */
static const char optstr[] = "hvVc:d:f:";
static const char usestr[] = "[-hvV][-c chunk][-d delay][-f file]";
static const char hlpstr[] =
" -c chunk Maximum time to wait for data in a chunk (default 10)\n"
" -d delay Maximum delay after line read (default: 5)\n"
" -f file Read from file instead of standard input\n"
" -h Print this help message and exit\n"
" -v Verbose mode: print timing information to stderr\n"
" -V Print version information and exit\n"
;
static struct timespec time_delay = { .tv_sec = 5, .tv_nsec = 0 };
static struct timespec time_chunk = { .tv_sec = 10, .tv_nsec = 0 };
static struct timespec time_start;
static bool verbose = false;
static void set_chunk_timeout(void);
static void set_delay_timeout(void);
static void cancel_timeout(void);
static void alarm_handler(int signum);
// Using signal() manages to set SA_RESTART on a Mac.
// This is allowed by standard C and POSIX, sadly.
// signal(SIGALRM, alarm_handler);
#if defined(USE_ALARM)
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
alarm(time_chunk.tv_sec);
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
unsigned time_left = alarm(0);
if (time_left > time_delay.tv_sec)
alarm(time_delay.tv_sec);
else
alarm(time_left);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
alarm(0);
signal(SIGALRM, SIG_IGN);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
#elif defined(USE_SETITIMER)
static inline struct timeval cvt_timespec_to_timeval(struct timespec ts)
{
return (struct timeval){ .tv_sec = ts.tv_sec, .tv_usec = ts.tv_nsec / 1000 };
}
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
tv_new.it_value = cvt_timespec_to_timeval(time_chunk);
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_until;
if (getitimer(ITIMER_REAL, &tv_until) != 0)
err_syserr("failed to set interval timer: ");
struct timeval tv_delay = cvt_timespec_to_timeval(time_delay);
if (verbose)
{
char buff1[32];
fmt_timeval(&tv_delay, 6, buff1, sizeof(buff1));
char buff2[32];
fmt_timeval(&tv_until.it_value, 6, buff2, sizeof(buff2));
err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
}
if (cmp_timeval(tv_until.it_value, tv_delay) <= 0)
{
if (verbose)
err_remark("---- %s(): no need for delay timer\n", __func__);
}
else
{
struct itimerval tv_new = { { 0, 0 }, { 0, 0 } };
tv_new.it_value = cvt_timespec_to_timeval(time_delay);
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
if (verbose)
err_remark("---- %s(): set delay timer\n", __func__);
}
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerval tv_new =
{
.it_value = { .tv_sec = 0, .tv_usec = 0 },
.it_interval = { .tv_sec = 0, .tv_usec = 0 },
};
struct itimerval tv_old;
if (setitimer(ITIMER_REAL, &tv_new, &tv_old) != 0)
err_syserr("failed to set interval timer: ");
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
#else /* USE_TIMER_GETTIME */
#include "timespec_math.h"
static timer_t t0 = { 0 };
static void set_chunk_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct sigevent ev =
{
.sigev_notify = SIGEV_SIGNAL,
.sigev_signo = SIGALRM,
.sigev_value.sival_int = 0,
.sigev_notify_function = 0,
.sigev_notify_attributes = 0,
};
if (timer_create(CLOCK_REALTIME, &ev, &t0) < 0)
err_syserr("failed to create a timer: ");
struct itimerspec it =
{
.it_interval = { .tv_sec = 0, .tv_nsec = 0 },
.it_value = time_chunk,
};
struct itimerspec ot;
if (timer_settime(t0, 0, &it, &ot) != 0)
err_syserr("failed to activate timer: ");
struct sigaction sa;
sa.sa_handler = alarm_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, NULL);
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void set_delay_timeout(void)
{
if (verbose)
err_remark("-->> %s()\n", __func__);
struct itimerspec time_until;
if (timer_gettime(t0, &time_until) != 0)
err_syserr("failed to set per-process timer: ");
char buff1[32];
fmt_timespec(&time_delay, 6, buff1, sizeof(buff1));
char buff2[32];
fmt_timespec(&time_until.it_value, 6, buff2, sizeof(buff2));
err_remark("---- %s(): delay %s, left %s\n", __func__, buff1, buff2);
if (cmp_timespec(time_until.it_value, time_delay) <= 0)
{
if (verbose)
err_remark("---- %s(): no need for delay timer\n", __func__);
}
else
{
struct itimerspec time_new =
{
.it_interval = { .tv_sec = 0, .tv_nsec = 0 },
.it_value = time_delay,
};
struct itimerspec time_old;
if (timer_settime(t0, 0, &time_new, &time_old) != 0)
err_syserr("failed to set per-process timer: ");
if (verbose)
err_remark("---- %s(): set delay timer\n", __func__);
}
if (verbose)
err_remark("<<-- %s()\n", __func__);
}
static void cancel_timeout(void)
{
if (timer_delete(t0) != 0)
err_syserr("failed to delete timer: ");
}
#endif /* Timing mode */
/* Writing to stderr via err_remark() is not officially supported */
static void alarm_handler(int signum)
{
assert(signum == SIGALRM);
if (verbose)
err_remark("---- %s(): signal %d\n", __func__, signum);
}
static void read_chunks(FILE *fp)
{
size_t num_data = 0;
size_t max_data = 0;
struct iovec *data = 0;
size_t buflen = 0;
char *buffer = 0;
ssize_t length;
size_t chunk_len = 0;
clock_gettime(CLOCK_REALTIME, &time_start);
set_chunk_timeout();
while ((length = getline(&buffer, &buflen, fp)) != -1)
{
if (num_data >= max_data)
{
size_t new_size = (num_data * 2) + 2;
void *newspace = realloc(data, new_size * sizeof(data[0]));
if (newspace == 0)
err_syserr("failed to allocate %zu bytes data: ", new_size * sizeof(data[0]));
data = newspace;
max_data = new_size;
}
data[num_data].iov_base = buffer;
data[num_data].iov_len = length;
num_data++;
if (verbose)
err_remark("Received line %zu\n", num_data);
chunk_len += length;
buffer = 0;
buflen = 0;
set_delay_timeout();
}
cancel_timeout();
if (chunk_len > 0)
{
if ((length = writev(STDOUT_FILENO, data, num_data)) < 0)
err_syserr("failed to write %zu bytes to standard output: ", chunk_len);
else if ((size_t)length != chunk_len)
err_error("failed to write %zu bytes to standard output "
"(short write of %zu bytes)\n", chunk_len, (size_t)length);
}
if (verbose)
err_remark("---- %s(): data written (%zu bytes)\n", __func__, length);
for (size_t i = 0; i < num_data; i++)
free(data[i].iov_base);
free(data);
free(buffer);
}
int main(int argc, char **argv)
{
const char *name = "(standard input)";
FILE *fp = stdin;
err_setarg0(argv[0]);
err_setlogopts(ERR_MICRO);
int opt;
while ((opt = getopt(argc, argv, optstr)) != -1)
{
switch (opt)
{
case 'c':
if (scn_timespec(optarg, &time_chunk) != 0)
err_error("Failed to convert '%s' into a time value\n", optarg);
break;
case 'd':
if (scn_timespec(optarg, &time_delay) != 0)
err_error("Failed to convert '%s' into a time value\n", optarg);
break;
case 'f':
if ((fp = fopen(optarg, "r")) == 0)
err_syserr("Failed to open file '%s' for reading: ", optarg);
name = optarg;
break;
case 'h':
err_help(usestr, hlpstr);
/*NOTREACHED*/
case 'v':
verbose = true;
break;
case 'V':
err_version("CHUNKER79", &"@(#)$Revision$ ($Date$)"[4]);
/*NOTREACHED*/
default:
err_usage(usestr);
/*NOTREACHED*/
}
}
if (optind != argc)
err_usage(usestr);
if (verbose)
{
err_remark("chunk: %3lld.%09ld\n", (long long)time_chunk.tv_sec, time_chunk.tv_nsec);
err_remark("delay: %3lld.%09ld\n", (long long)time_delay.tv_sec, time_delay.tv_nsec);
err_remark("file: %s\n", name);
}
read_chunks(fp);
return 0;
}
В моем репозитории SOQ также есть скрипт gen-data.sh
, который использует некоторые пользовательские программы для генерации потока данных, такого как этот(начальное значение записывается в стандартную ошибку, а не в стандартный вывод):
$ gen-data.sh
# Seed: 1313715286
2019-06-03 23:04:16.653: Zunmieoprri Rdviqymcho 5878 2017-03-29 03:59:15 Udransnadioiaeamprirteo
2019-06-03 23:04:18.525: Rndflseoevhgs Etlaevieripeoetrnwkn 9500 2015-12-18 10:49:15 Ebyrcoebeezatiagpleieoefyc
2019-06-03 23:04:20.526: Nrzsuiakrooab Nbvliinfqidbujoops 1974 2020-05-13 08:05:14 Lgithearril
2019-06-03 23:04:21.777: Eeagop Aieneose 6533 2016-11-06 22:51:58 Aoejlwebbssroncmeovtuuueigraa
2019-06-03 23:04:23.876: Izirdoeektau Atesltiybysaclee 4557 2020-09-13 02:24:46 Igrooiaauiwtna
2019-06-03 23:04:26.145: Yhioit Eamrexuabagsaraiw 9703 2014-09-13 07:44:12 Dyiiienglolqopnrbneerltnmsdn
^C
$
При подаче в chunker79
с параметрами по умолчанию, я получаю вывод как:
$ gen-data.sh | chunker79
# Seed: 722907235
2019-06-03 23:06:20.570: Aluaezkgiebeewal Oyvahee 1022 2015-08-12 07:45:54 Weuababeeduklleym
2019-06-03 23:06:24.100: Gmujvoyevihvoilc Negeiiuvleem 8196 2015-08-29 21:15:15 Nztkrvsadeoeagjgoyotvertavedi
$
Если выпроанализируйте временные интервалы (посмотрите на первые два поля в выходных строках), чтобы выходные данные соответствовали спецификации.Еще более подробный анализ представлен следующим образом:
$ timecmd -mr -- gen-data.sh | timecmd -mr -- chunker79
2019-06-03 23:09:14.246 [PID 57159] gen-data.sh
2019-06-03 23:09:14.246 [PID 57160] chunker79
# Seed: -1077610201
2019-06-03 23:09:14.269: Woreio Rdtpimvoscttbyhxim 7893 2017-03-12 12:46:57 Uywaietirkekes
2019-06-03 23:09:16.939: Uigaba Nzoxdeuisofai 3630 2017-11-16 09:28:59 Jnsncgoesycsevdscugoathusaoq
2019-06-03 23:09:17.845: Sscreua Aloaoonnsuur 5163 2016-08-13 19:47:15 Injhsiifqovbnyeooiimitaaoir
2019-06-03 23:09:19.272 [PID 57160; status 0x0000] - 5.026s - chunker79
2019-06-03 23:09:22.084 [PID 57159; status 0x8D00] - 7.838s - gen-data.sh
$
В этой настройке наблюдается заметная пауза между появлением выхода из chunker79
и завершением gen-data.sh
.Это связано с тем, что Bash ожидает завершения всех процессов в конвейере, а gen-data.sh
не завершается до следующей записи в канал после сообщения, заканчивающего chunker79
.Это артефакт этой тестовой установки;это не будет фактором в сценарии оболочки, описанном в вопросе.