Linux неблокирующая fifo (регистрация по требованию) - PullRequest
31 голосов
/ 09 сентября 2011

Мне нравится регистрировать вывод программ «по требованию». Например. выход регистрируется на терминале, но другой процесс может подключить токовый выход в любое время.

Классический способ будет:

myprogram 2>&1 | tee /tmp/mylog

и по запросу

tail /tmp/mylog

Однако это приведет к созданию постоянно растущего файла журнала, даже если он не используется до тех пор, пока на диске не закончится свободное место. Итак, моя попытка была:

mkfifo /tmp/mylog
myprogram 2>&1 | tee /tmp/mylog

и по требованию

cat /tmp/mylog

Теперь я могу читать / tmp / mylog в любое время. Однако любой вывод блокирует программу, пока не будет прочитан / tmp / mylog. Мне нравится fifo, чтобы сбрасывать любые входящие данные, не считанные назад. Как это сделать?

Ответы [ 9 ]

46 голосов
/ 01 октября 2011

Вдохновленный вашим вопросом, я написал простую программу, которая позволит вам сделать это:

$ myprogram 2>&1 | ftee /tmp/mylog

Он ведет себя аналогично tee, но клонирует stdin в stdout ив именованную трубу (требование пока) без блокировки.Это означает, что если вы хотите вести журнал таким образом, может случиться так, что вы потеряете данные журнала, но я думаю, что это приемлемо в вашем сценарии.Хитрость заключается в том, чтобы блокировать сигнал SIGPIPE и игнорировать ошибку при записи в поврежденный fifo.Этот образец, конечно, может быть оптимизирован различными способами, но, на мой взгляд, он выполняет свою работу.

/* ftee - clone stdin to stdout and to a named pipe 
(c) racic@stackoverflow
WTFPL Licence */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
    int readfd, writefd;
    struct stat status;
    char *fifonam;
    char buffer[BUFSIZ];
    ssize_t bytes;

    signal(SIGPIPE, SIG_IGN);

    if(2!=argc)
    {
        printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
            " named pipe, required argument\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    fifonam = argv[1];

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
    if(-1==readfd)
    {
        perror("ftee: readfd: open()");
        exit(EXIT_FAILURE);
    }

    if(-1==fstat(readfd, &status))
    {
        perror("ftee: fstat");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    if(!S_ISFIFO(status.st_mode))
    {
        printf("ftee: %s in not a fifo!\n", fifonam);
        close(readfd);
        exit(EXIT_FAILURE);
    }

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
    if(-1==writefd)
    {
        perror("ftee: writefd: open()");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    close(readfd);

    while(1)
    {
        bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
        if (bytes < 0 && errno == EINTR)
            continue;
        if (bytes <= 0)
            break;

        bytes = write(STDOUT_FILENO, buffer, bytes);
        if(-1==bytes)
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes);
        if(-1==bytes);//Ignoring the errors
    }
    close(writefd); 
    return(0);
}

Вы можете скомпилировать его с помощью этой стандартной команды:

$ gcc ftee.c -o ftee

Вы можете быстро проверить это, запустив, например:

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

Также обратите внимание - это не мультиплексор.Вы можете иметь только один процесс, выполняющий $ cat /tmp/mylog за один раз.

11 голосов
/ 03 июня 2015

Это (очень) старый поток, но я столкнулся с подобной проблемой в последнее время. На самом деле мне нужно было клонировать стандартный ввод в стандартный вывод с копией в канал, который не блокируется. предложенный ftee в первом ответе действительно помог там, но был (для моего случая использования) слишком изменчивым. Это означает, что я потерял данные, которые мог бы обработать, если бы получил их вовремя.

Сценарий, с которым я столкнулся, заключается в том, что у меня есть процесс (some_process), который агрегирует некоторые данные и записывает их результаты каждые три секунды в стандартный вывод. (Упрощенная) установка выглядела так (в реальной установке я использую именованный канал):

some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz

Теперь raw_data.gz должен быть сжат и должен быть завершен. Ftee делает эту работу очень хорошо. Но канал, который я использую в середине, был слишком медленным, чтобы захватить сброшенные данные - но он был достаточно быстрым, чтобы обработать все, если смог, - это было проверено с помощью обычной тройника. Тем не менее, нормальный тройник блокируется, если что-то случится с неназванным каналом, и, поскольку я хочу иметь возможность подключаться по требованию, тройник не вариант. Вернуться к теме: стало лучше, когда я поместил буфер между ними, в результате:

some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz

Но это все еще потеряло данные, которые я мог обработать. Поэтому я пошел дальше и расширил предложенный ранее ftee до буферизованной версии (bftee). Он по-прежнему имеет все те же свойства, но использует (неэффективный?) Внутренний буфер в случае сбоя записи. Он по-прежнему теряет данные, если буфер заполнен, но в моем случае это прекрасно работает. Как всегда, есть много возможностей для улучшения, но, поскольку я скопировал отсюда код, я хотел бы поделиться им с людьми, которые могут его использовать.

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe 
    (c) racic@stackoverflow
    (c) fabraxias@stackoverflow
    WTFPL Licence */

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <signal.h>
    #include <unistd.h>

    // the number of sBuffers that are being held at a maximum
    #define BUFFER_SIZE 4096
    #define BLOCK_SIZE 2048

    typedef struct {
      char data[BLOCK_SIZE];
      int bytes;
    } sBuffer;

    typedef struct {
      sBuffer *data;  //array of buffers
      int bufferSize; // number of buffer in data
      int start;      // index of the current start buffer
      int end;        // index of the current end buffer
      int active;     // number of active buffer (currently in use)
      int maxUse;     // maximum number of buffers ever used
      int drops;      // number of discarded buffer due to overflow
      int sWrites;    // number of buffer written to stdout
      int pWrites;    // number of buffers written to pipe
    } sQueue;

    void InitQueue(sQueue*, int);              // initialized the Queue
    void PushToQueue(sQueue*, sBuffer*, int);  // pushes a buffer into Queue at the end 
    sBuffer *RetrieveFromQueue(sQueue*);       // returns the first entry of the buffer and removes it or NULL is buffer is empty
    sBuffer *PeakAtQueue(sQueue*);             // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
    void ShrinkInQueue(sQueue *queue, int);    // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
    void DelFromQueue(sQueue *queue);          // removes the first entry of the queue

    static void sigUSR1(int);                  // signal handled for SUGUSR1 - used for stats output to stderr
    static void sigINT(int);                   // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?

    sQueue queue;                              // Buffer storing the overflow
    volatile int quit;                         // for quiting the main loop

    int main(int argc, char *argv[])
    {   
        int readfd, writefd;
        struct stat status;
        char *fifonam;
        sBuffer buffer;
        ssize_t bytes;
        int bufferSize = BUFFER_SIZE;

        signal(SIGPIPE, SIG_IGN);
        signal(SIGUSR1, sigUSR1);
        signal(SIGTERM, sigINT);
        signal(SIGINT,  sigINT);

        /** Handle commandline args and open the pipe for non blocking writing **/

        if(argc < 2 || argc > 3)
        {   
            printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
                   "FIFO - path to a named pipe, required argument\n"
                   "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
            exit(EXIT_FAILURE);
        }

        fifonam = argv[1];
        if (argc == 3) {
          bufferSize = atoi(argv[2]);
          if (bufferSize == 0) bufferSize = BUFFER_SIZE;
        }

        readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
        if(-1==readfd)
        {   
            perror("bftee: readfd: open()");
            exit(EXIT_FAILURE);
        }

        if(-1==fstat(readfd, &status))
        {
            perror("bftee: fstat");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        if(!S_ISFIFO(status.st_mode))
        {
            printf("bftee: %s in not a fifo!\n", fifonam);
            close(readfd);
            exit(EXIT_FAILURE);
        }

        writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
        if(-1==writefd)
        {
            perror("bftee: writefd: open()");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        close(readfd);


        InitQueue(&queue, bufferSize);
        quit = 0;

        while(!quit)
        {
            // read from STDIN
            bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));

            // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
            if (bytes < 0 && errno == EINTR) continue;
            if (bytes <= 0) break;

            // save the number if read bytes in the current buffer to be processed
            buffer.bytes = bytes;

            // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
            // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
            bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
            queue.sWrites++;

            if(-1==bytes) {
                perror("ftee: writing to stdout");
                break;
            }

            sBuffer *tmpBuffer = NULL;

            // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
            // the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
            // NOTE: bytes cannot be -1  (that would have failed just before) when the loop is entered. 
            while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
               // write the oldest buffer to the pipe
               bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);

               // the  written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
               if (bytes == tmpBuffer->bytes) {
                 DelFromQueue(&queue);
                 queue.pWrites++;
               } else if (bytes > 0) {
                 // on a positive bytes value there was a partial write. we shrink the current buffer
                 //  and handle this as a write failure
                 ShrinkInQueue(&queue, bytes);
                 bytes = -1;
               }
            }
            // There are several cases here:
            // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
            // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
            // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
            if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);

            // again, there are several cases what can happen here
            // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
            // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
            // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
            if (bytes != buffer.bytes)
              PushToQueue(&queue, &buffer, bytes);
            else 
              queue.pWrites++;
        }

        // once we are done with STDIN, try to flush the buffer to the named pipe
        if (queue.active > 0) {
           //set output buffer to block - here we wait until we can write everything to the named pipe
           // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. 
           int saved_flags = fcntl(writefd, F_GETFL);
           int new_flags = saved_flags & ~O_NONBLOCK;
           int res = fcntl(writefd, F_SETFL, new_flags);

           sBuffer *tmpBuffer = NULL;
           //TODO: this does not handle partial writes yet
           while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
             int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
             if (bytes != -1) DelFromQueue(&queue);
           }
        }

        close(writefd);

    }


    /** init a given Queue **/
    void InitQueue (sQueue *queue, int bufferSize) {
      queue->data = calloc(bufferSize, sizeof(sBuffer));
      queue->bufferSize = bufferSize;
      queue->start = 0;
      queue->end = 0;
      queue->active = 0;
      queue->maxUse = 0;
      queue->drops = 0;
      queue->sWrites = 0;
      queue->pWrites = 0;
    }

    /** push a buffer into the Queue**/
    void PushToQueue(sQueue *queue, sBuffer *p, int offset)
    {

        if (offset < 0) offset = 0;      // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
        if (offset == p->bytes) return;  // in this case there are 0 bytes to add to the queue. Nothing to write

        // this should never happen - offset cannot be bigger than the buffer itself. Panic action
        if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}

        // debug output on a partial write. TODO: remove this line
        // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");

        // copy the data from the buffer into the queue and remember its size
        memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
        queue->data[queue->end].bytes = p->bytes - offset;

        // move the buffer forward
        queue->end = (queue->end + 1) % queue->bufferSize;

        // there is still space in the buffer
        if (queue->active < queue->bufferSize)
        {
            queue->active++;
            if (queue->active > queue->maxUse) queue->maxUse = queue->active;
        } else {
            // Overwriting the oldest. Move start to next-oldest
            queue->start = (queue->start + 1) % queue->bufferSize;
            queue->drops++;
        }
    }

    /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
    sBuffer *RetrieveFromQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }

        queue->start = (queue->start + 1) % queue->bufferSize;
        queue->active--;
        return &(queue->data[queue->start]);
    }

    /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
    sBuffer *PeakAtQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }
        return &(queue->data[queue->start]);
    }

    /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
    void ShrinkInQueue(sQueue *queue, int bytes) {

      // cannot remove negative amount of bytes - this is an error case. Ignore it
      if (bytes <= 0) return;

      // remove the entry if the offset is equal to the buffer size
      if (queue->data[queue->start].bytes == bytes) {
        DelFromQueue(queue);
        return;
      };

      // this is a partial delete
      if (queue->data[queue->start].bytes > bytes) {
        //shift the memory by the offset
        memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
        queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
        return;
      }

      // panic is the are to remove more than we have the buffer
      if (queue->data[queue->start].bytes < bytes) {
        perror("we wrote more than we had - this should never happen\n");
        exit(EXIT_FAILURE);
        return;
      }
    }

    /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
    void DelFromQueue(sQueue *queue)
    {
        if (queue->active > 0) {
          queue->start = (queue->start + 1) % queue->bufferSize;
          queue->active--;
        }
    }

    /** Stats output on SIGUSR1 **/
    static void sigUSR1(int signo) {
      fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
    }

    /** handle signal for terminating **/
    static void sigINT(int signo) {
      quit++;
      if (quit > 1) exit(EXIT_FAILURE);
    }

Эта версия принимает еще один (необязательный) аргумент, который указывает количество блоков, которые должны быть буферизованы для канала. Мой пример звонка теперь выглядит так:

some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz

, что приводит к буферизации 16384 блоков перед сбросом. это использует примерно на 32 Мбайт больше памяти, но ... кого это волнует?

Конечно, в реальной среде я использую именованный канал, чтобы я мог присоединять и отсоединять по мере необходимости. Вот так выглядит:

mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results

Кроме того, процесс реагирует на сигналы следующим образом: SIGUSR1 -> печать счетчиков в STDERR SIGTERM, SIGINT -> первый выходит из основного цикла и сбрасывает буфер в канал, второй немедленно завершает программу.

Может быть, это поможет кому-то в будущем ... Наслаждайтесь

8 голосов
/ 09 сентября 2011

Однако это приведет к созданию постоянно растущего файла журнала, даже если он не используется до тех пор, пока на диске не останется свободного места.

Почему бы не периодически вращать журналы?Есть даже программа, которая сделает это за вас logrotate.

Также есть система для генерации сообщений журнала и выполнения с ними разных действий в зависимости от типа.Это называется syslog.

Вы можете даже объединить их.Пусть ваша программа генерирует сообщения syslog, сконфигурирует syslog для помещения их в файл и использует logrotate, чтобы они не заполняли диск.


Если выяснилось, что вы писали для небольшой встроенной системыи вывод программы большой, есть множество методов, которые вы могли бы рассмотреть.

  • Удаленный системный журнал: отправьте сообщения системного журнала на сервер системного журнала в сети.
  • Используйте уровни серьезностидоступны в системном журнале, чтобы делать разные вещи с сообщениями.Например, отменить «INFO», но войти и переслать «ERR» или выше.Например, в консоль
  • Используйте обработчик сигналов в вашей программе, чтобы перечитать конфигурацию в HUP и таким образом изменить генерацию журнала «по требованию».
  • Пусть ваша программа прослушивает сокет Unix и записывает сообщенияэто когда открыто.Вы даже можете внедрить и интерактивную консоль в свою программу таким образом.
  • Используя файл конфигурации, обеспечьте детальный контроль над выходом журнала.
6 голосов
/ 10 сентября 2011

BusyBox, часто используемый на встроенных устройствах, может создавать буферизованный журнал оперативной памяти с помощью

syslogd -C

, которое может быть заполнено

logger

и прочитано

logread

Работает довольно хорошо, но предоставляет только один глобальный журнал.

5 голосов
/ 05 октября 2015

Кажется, что оператор перенаправления bash <> ( 3.6.10 Открытие файловых дескрипторов для чтения и записиSee ) делает запись в файл / fifo, открытый с его неблокирующим действием. Это должно работать:

$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend

Решение, данное gniourf_gniourf на канале #bash IRC.

4 голосов
/ 09 сентября 2011

Если вы можете установить экран на встроенное устройство, то можете запустить в нем «myprogram», отсоединить его и заново подключить в любое время, когда захотите посмотреть журнал.Что-то вроде:

$ screen -t sometitle myprogram
Hit Ctrl+A, then d to detach it.

Всякий раз, когда вы хотите увидеть вывод, подключите его снова:

$ screen -DR sometitle
Hit Ctrl-A, then d to detach it again.

Таким образом, вам не придется беспокоиться о выводе программы с использованием дискового пространства вообще.

3 голосов
/ 10 сентября 2011

Проблема с данным подходом fifo состоит в том, что все это будет зависать, когда буфер канала заполняется и процесс чтения не происходит.

Для подхода fifo к работе Iдумаю, вам нужно будет реализовать модель клиент-сервер с именованным каналом, аналогичную модели, упомянутой в BASH: лучшая архитектура для чтения из двух входных потоков (см. слегка измененный код ниже, пример кода 2).

В качестве обходного пути вы могли бы также использовать конструкцию while ... read вместо tee ввода stdout в именованный канал, реализовав механизм подсчета внутри цикла while ... read, который будет периодически перезаписывать файл журнала на указанное количество строк.,Это предотвратит постоянно растущий файл журнала (пример кода 1).

# sample code 1

# terminal window 1
rm -f /tmp/mylog
touch /tmp/mylog
while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | while IFS="" read -r line; do 
  lno=$((lno+1))
  #echo $lno
  array[${lno}]="${line}"
  if [[ $lno -eq 10 ]]; then
    lno=$((lno+1))
    array[${lno}]="-------------"
    printf '%s\n' "${array[@]}" > /tmp/mylog
    unset lno array
  fi
  printf '%s\n' "${line}"
done

# terminal window 2
tail -f /tmp/mylog


#------------------------


# sample code 2

# code taken from: 
# /4186039/bash-luchshaya-arhitektura-dlya-chteniya-iz-dvuh-vhodnyh-potokov
# terminal window 1

# server
(
rm -f /tmp/to /tmp/from
mkfifo /tmp/to /tmp/from
while true; do 
  while IFS="" read -r -d $'\n' line; do 
    printf '%s\n' "${line}"
  done </tmp/to >/tmp/from &
  bgpid=$!
  exec 3>/tmp/to
  exec 4</tmp/from
  trap "kill -TERM $bgpid; exit" 0 1 2 3 13 15
  wait "$bgpid"
  echo "restarting..."
done
) &
serverpid=$!
#kill -TERM $serverpid

# client
(
exec 3>/tmp/to;
exec 4</tmp/from;
while IFS="" read -r -d $'\n' <&4 line; do
  if [[ "${line:0:1}" == $'\177' ]]; then 
    printf 'line from stdin: %s\n' "${line:1}"  > /dev/null
  else       
    printf 'line from fifo: %s\n' "$line"       > /dev/null
  fi
done &
trap "kill -TERM $"'!; exit' 1 2 3 13 15
while IFS="" read -r -d $'\n' line; do
  # can we make it atomic?
  # sleep 0.5
  # dd if=/tmp/to iflag=nonblock of=/dev/null  # flush fifo
  printf '\177%s\n' "${line}"
done >&3
) &
# kill -TERM $!


# terminal window 2
# tests
echo hello > /tmp/to
yes 1 | nl > /tmp/to
yes 1 | nl | tee /tmp/to
while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | tee -a /tmp/to


# terminal window 3
cat /tmp/to | head -n 10
2 голосов
/ 10 декабря 2015

Если ваш процесс выполняет запись в любой файл журнала, а затем стирает файл и запускается снова и снова, поэтому он не становится слишком большим или использует logrotate.

tail --follow=name --retry my.log

Это все, что вам нужно. Вы получите столько же прокрутки, сколько и ваш терминал.

Ничего нестандартного не требуется. Я не пробовал это с небольшими файлами журналов, но все наши журналы вращаются так, и я никогда не замечал потерянных строк.

0 голосов
/ 15 ноября 2018

Ведение журнала может быть направлено на сокет UDP.Поскольку UDP не требует соединения, он не будет блокировать отправляющую программу.Конечно, журналы будут потеряны, если приемник или сеть не смогут поддерживать работу.

myprogram 2>&1 | socat - udp-datagram:localhost:3333

Тогда, когда вы захотите наблюдать за регистрацией:

socat udp-recv:3333 -

Есть и другие интересные преимуществанапример, возможность одновременного подключения нескольких слушателей или трансляции на несколько устройств.

...