Существует ли какая-либо программа cli, которая может проксировать канал и генерировать код завершения сбоя, если пропускная способность ниже определенного порога? - PullRequest
0 голосов
/ 28 января 2020

Часто у меня есть типичный код ETL, который выглядит следующим образом

./call_some_api.py | ./extract_transform | ./load_to_some_sql

Что если один из первых двух сценариев прекратит отправку байтов из-за какой-то внутренней ошибки, которая приводит к их остановке. У меня sh была другая программа, которую я мог поставить перед ./load_to_some_sql, которая обнаружит, что 0 байт было отправлено за 5 минут, и выдаст код выхода.

У Curl есть --speed-limit и --speed-time которые делают именно это, но это не обобщено для других приложений Cli.

Есть ли способ сделать это? Есть ли способ взломать sh, если пропускная способность достигает определенного уровня в трубе? Я знаю, что конечные автоматы и другие инструменты оркестровки могут сделать это, но в целом, если есть способ сделать это из bash, это было бы полезно!

Ответы [ 2 ]

2 голосов
/ 29 января 2020

Если вас интересует тайм-аут неактивности, они могут это сделать.

Один вкладыш (упоминается в комментариях). Отрегулируйте $t для секунд бездействия:

perl -e'$t=300;$SIG{ALRM}=sub{die"Timeout\n"};alarm$t;while(<>){print;alarm$t}'

Более устойчивый:

#include <sys/epoll.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <signal.h>

long secs = 300;
int stdin_mode, stdout_mode;

void my_exit(int code) {
    fcntl(STDIN_FILENO, F_SETFL, stdin_mode);
    fcntl(STDOUT_FILENO, F_SETFL, stdout_mode);
    exit(code);
}

char* progname = "";
void perror_die(char* msg) {
    dprintf(STDERR_FILENO, "%s: %s: %m\n", progname, msg);
    my_exit(EXIT_FAILURE);
}

int check(int ret, char *msg) {
    if (ret == -1) perror_die(msg);
    return ret;
}

void usage() {
    dprintf(STDERR_FILENO, "Usage:\n-s <secs> inactivity timeout in seconds (default: %ld seconds)\n", secs);
    exit(EXIT_SUCCESS);
}

int main(int argc, char* argv[]) {
    progname = argv[0];
    signal(SIGHUP, SIG_IGN);
    stdin_mode = fcntl(STDIN_FILENO, F_GETFL);
    stdout_mode = fcntl(STDOUT_FILENO, F_GETFL);

    int opt; 
    while((opt = getopt(argc, argv, "s:h")) != -1)
        switch(opt) {
            case 's':
                secs = strtol(optarg, NULL, 10);
                break;
            case 'h':
            case '?':
                usage();             
        }; 
    if (optind < argc) usage();

    int epfd = check(epoll_create(3), "epoll_create");

    struct epoll_event ev = { 
        .events = EPOLLIN | EPOLLET,
        .data.fd = STDIN_FILENO
    };

    check(epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev), "epoll_ctl");
    check(fcntl(STDIN_FILENO, F_SETFL, stdin_mode | O_NONBLOCK), "fcntl stdin");

    ev.events = 0;
    ev.data.fd = STDOUT_FILENO;
    int epout = !epoll_ctl(epfd, EPOLL_CTL_ADD, STDOUT_FILENO, &ev);
    if (epout)
        check(fcntl(STDOUT_FILENO, F_SETFL, stdout_mode | O_NONBLOCK), "fcntl stdout");

    char buf[4096];

    ssize_t b_read = 0, b_wrote = 0, bytes = 0;

    secs *= 1000;
    int epres;
    while((epres = epoll_wait(epfd, &ev, 1, secs)) == 1) {
        if (ev.events & EPOLLERR) {
            errno = EPIPE;
            break;
        }
        for(;;) {
            if (b_read == 0) {
                b_read = read(STDIN_FILENO, buf, sizeof(buf));
                if (b_read > 0) bytes += b_read;
                else if (b_read == -1) {
                    if (errno == EAGAIN) {
                        b_read = 0;
                        break;
                    } else perror_die("read");
                } else my_exit(EXIT_SUCCESS);
            }
            if (b_read) {
                int w = write(STDOUT_FILENO, buf + b_wrote, b_read - b_wrote);
                if (w != -1) b_wrote += w;
                else {
                    if (errno == EAGAIN && epout) {
                        ev.events = EPOLLOUT | EPOLLONESHOT;
                        ev.data.fd = STDOUT_FILENO;
                        check(epoll_ctl(epfd, EPOLL_CTL_MOD, STDOUT_FILENO, &ev), "epoll_ctl");
                        break;
                    } else perror_die("write");
                }
                if (b_wrote == b_read) b_read = b_wrote = 0;
            }
        }
    }
    if (epres) perror_die("event loop");
    dprintf(STDERR_FILENO, "%s: Timeout reached\n", progname);
    my_exit(EXIT_FAILURE);
}

Если вы заинтересованы в измерении байтов / сек, это можно сделать, на интервал: (вычисление может быть улучшено, чтобы сохранить пропускную способность в течение n секунд вместо интервала)

#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/timerfd.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <signal.h>

ssize_t min_bw = 1000;
long secs = 5;

int stdin_mode, stdout_mode;

void my_exit(int code) {
    fcntl(STDIN_FILENO, F_SETFL, stdin_mode);
    fcntl(STDOUT_FILENO, F_SETFL, stdout_mode);
    exit(code);
}

char* progname = "";
void perror_die(char* msg) {
    dprintf(STDERR_FILENO, "%s: %s: %m\n", progname, msg);
    my_exit(EXIT_FAILURE);
}

int check(int ret, char *msg) {
    if (ret == -1) perror_die(msg);
    return ret;
}

void usage() {
    dprintf(STDERR_FILENO, "Usage:\n-b <bytes> minimum bytes per (default: %ld bytes)\n-s <secs> seconds (default: %ld seconds)\n", min_bw, secs);
    exit(EXIT_SUCCESS);
}

int main(int argc, char* argv[]) {
    progname = argv[0];
    signal(SIGHUP, SIG_IGN);
    stdin_mode = fcntl(STDIN_FILENO, F_GETFL);
    stdout_mode = fcntl(STDOUT_FILENO, F_GETFL);

    int opt; 
    while((opt = getopt(argc, argv, "b:s:h")) != -1)
        switch(opt) {
            case 'b':
                min_bw = strtol(optarg, NULL, 10);
                break;
            case 's':
                secs = strtol(optarg, NULL, 10);
                break;
            case 'h':
            case '?':
                usage();             
        }; 
    if (optind < argc) usage();

    int epfd = check(epoll_create(3), "epoll_create");

    int tmfd = check(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK), "timerfd_create");
    struct itimerspec when = {{secs, 0}, {secs, 0}};
    check(timerfd_settime(tmfd, 0, &when, NULL), "timerfd_settime");

    struct epoll_event ev = { 
        .events = EPOLLIN | EPOLLET,
        .data.fd = tmfd
    };
    check(epoll_ctl(epfd, EPOLL_CTL_ADD, tmfd, &ev), "epoll_ctl");

    ev.data.fd = STDIN_FILENO;
    check(epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev), "epoll_ctl");
    check(fcntl(STDIN_FILENO, F_SETFL, stdin_mode | O_NONBLOCK), "fcntl stdin");

    ev.events = 0;
    ev.data.fd = STDOUT_FILENO;
    int epout = !epoll_ctl(epfd, EPOLL_CTL_ADD, STDOUT_FILENO, &ev);
    if (epout)
        check(fcntl(STDOUT_FILENO, F_SETFL, stdout_mode | O_NONBLOCK), "fcntl stdout");

    char tmbuf[64];
    char buf[4096];

    ssize_t b_read = 0, b_wrote = 0, bytes = 0;

    int epres;
    while((epres = epoll_wait(epfd, &ev, 1, -1)) == 1) {
        if (ev.events & EPOLLERR) {
            errno = EPIPE;
            break;
        }
        if (ev.data.fd == tmfd) {
            if (bytes < min_bw) {
                dprintf(STDERR_FILENO, "Too slow\n");
                my_exit(EXIT_FAILURE);
            }
            check(read(tmfd, tmbuf, sizeof(tmbuf)), "read timerfd");
            bytes = 0;
            continue;
        }
        for(;;) {
            if (b_read == 0) {
                b_read = read(STDIN_FILENO, buf, sizeof(buf));
                if (b_read > 0) bytes += b_read;
                else if (b_read == -1) {
                    if (errno == EAGAIN) {
                        b_read = 0;
                        break;
                    } else perror_die("read");
                } else my_exit(EXIT_SUCCESS);
            }
            if (b_read) {
                int w = write(STDOUT_FILENO, buf + b_wrote, b_read - b_wrote);
                if (w != -1) b_wrote += w;
                else {
                    if (errno == EAGAIN && epout) {
                        ev.events = EPOLLOUT | EPOLLONESHOT;
                        ev.data.fd = STDOUT_FILENO;
                        check(epoll_ctl(epfd, EPOLL_CTL_MOD, STDOUT_FILENO, &ev), "epoll_ctl");
                        break;
                    } else perror_die("write");
                }
                if (b_wrote == b_read) b_read = b_wrote = 0;
            }
        }
    }
    perror_die("event loop");
}

Обе программы предназначены только для linux.

1 голос
/ 29 января 2020

При условии, что у вас есть новая sh версия Bash, встроенная функция read может опционально устанавливать тайм-аут [ docs ], что означает, что на самом деле не сложно написать немного чистого Bash код, который в точности соответствует тому, что вы описываете:

... upstream command ... \
| {
  while true ; do
    read -r -N 1 -t 300 char
    result=$?
    if (( result > 128 )) ; then
      echo 'Read timed out.' >&2
      exit 1
    elif (( result > 0 )) ; then
      exit 0
    elif [[ "$char" == '' ]] ; then
      printf '\0'
    else
      printf %s "$char"
    fi
  done
} \
| ... downstream command ...

(Примечание: выше для простоты читается один символ за раз; если вы ожидаете, что через это пройдет большой объем данных, то вы возможно, потребуется скорректировать его для лучшей производительности.)

Но даже несмотря на то, что вышеприведенное делает именно то, что вы описываете, я не уверен, что оно действительно достигнет вашей цели, потому что команда upstream не будет на самом деле d ie, пока он не попытается что-то написать и не получит SIGPIPE. Так что даже после вышеприведенных отпечатков Read timed out, Bash будет просто висеть бесконечно, вероятно, до тех пор, пока пользователь не нажмет Ctrl- C. (То же самое относится и к ответу Нири, конечно.) Это немного сложнее исправить; Я предполагаю, что когда ваша команда обнаруживает тайм-аут, ей нужно будет найти идентификаторы процессов вышестоящих команд и использовать kill для проактивной отправки им SIGPIPE (или другого выбранного вами сигнала)?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...