Использование именованных каналов с bash - проблема с потерей данных - PullRequest
14 голосов
/ 27 ноября 2010

Сделал поиск в Интернете, нашел простые «учебные пособия» для использования именованных каналов. Однако, когда я что-то делаю с фоновыми заданиями, я, похоже, теряю много данных.

[[Edit: нашел гораздо более простое решение, см. Ответ на пост. Итак, вопрос, который я поставил, теперь академический - на случай, если кто-то захочет работать на сервере]]

Использование Ubuntu 10.04 с Linux 2.6.32-25-generic # 45-Ubuntu SMP Sat 16 октября 19:52:42 UTC 2010 x86_64 GNU / Linux

GNU bash, версия 4.1.5 (1) -релиз (x86_64-pc-linux-gnu).

Моя функция bash:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

Я запускаю это в фоновом режиме:

> jqs&
[1] 5336

А теперь я его кормлю:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

Вывод не соответствует. Я часто не получаю отголоски успеха. Я получаю столько же новых эхо текста, сколько эхо успеха, иногда меньше.

Если я удаляю '&' из 'ленты', она, кажется, работает, но я блокируюсь до тех пор, пока вывод не будет прочитан. Поэтому я хочу, чтобы подпроцессы были заблокированы, но не основной процесс.

Цель состоит в том, чтобы написать простой сценарий управления заданиями, чтобы я мог выполнять, скажем, максимум 10 заданий параллельно и ставить в очередь остальные для последующей обработки, но достоверно знать, что они выполняются.

Полный менеджер работ ниже:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

Calling

jq <name> <max processes> <command>
jq abc 2 sleep 20

запустит один процесс. Эта часть отлично работает. Начните второй, хорошо. Один за другим, кажется, работает нормально. Но запуск 10 в цикле, похоже, приводит к потере системы, как в более простом примере выше.

Будем весьма благодарны за любые подсказки относительно того, что я могу сделать для решения этой очевидной потери данных МПК.

С уважением, Ален.

Ответы [ 6 ]

26 голосов
/ 27 ноября 2010

Ваша проблема в следующем выражении: if:

while true
do
    if read txt <"$pipe"
    ....
done

В результате сервер очереди заданий каждый раз открывает и закрывает канал.Это означает, что некоторые клиенты получают ошибку «сломанный канал» при попытке записи в канал, то есть читатель канала исчезает после того, как его открывает писатель.

Чтобы это исправить,измените ваш цикл на сервере, откройте канал один раз для всего цикла:

while true
do
    if read txt
    ....
done < "$pipe"

Сделано таким образом, канал открывается один раз и остается открытым.

Вам нужно быть осторожным сто, что вы запускаете внутри цикла, так как вся обработка внутри цикла будет иметь подключенный stdin к именованному каналу.Вы захотите убедиться, что перенаправляете stdin всех своих процессов внутри цикла откуда-то еще, иначе они могут потреблять данные из канала.

Редактировать: Теперь проблема в том, что вы получаете EOF на вашемчитает, когда последний клиент закрывает канал, вы можете использовать jilles метод дублирования файловых дескрипторов, или вы можете просто убедиться, что вы тоже клиент и оставить сторону записи канала открытой:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

Это сохранит сторону записи канала открытой на fd 3. С этим дескриптором файла применимо то же предостережение, что и с stdin.Вам нужно будет закрыть его, чтобы дочерние процессы не наследовали его.Возможно, это имеет меньшее значение, чем с stdin, но будет чище.

6 голосов
/ 28 ноября 2010

Как уже говорилось в других ответах, необходимо всегда держать fifo открытым, чтобы избежать потери данных.

Однако, как только все писатели ушли после того, как fifo был открыт (так что был писатель), чтение сразу возвращается (и poll() возвращает POLLHUP). Единственный способ очистить это состояние - снова открыть fifo.

POSIX не предоставляет решения этой проблемы, но, по крайней мере, Linux и FreeBSD делают: если чтение перестает работать, снова откройте fifo, сохраняя исходный дескриптор открытым. Это работает, потому что в Linux и FreeBSD состояние «зависания» является локальным для определенного описания открытого файла, а в POSIX оно является глобальным для fifo.

Это можно сделать с помощью сценария оболочки, например:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done
1 голос
/ 28 ноября 2010

Только для тех, кто может быть заинтересован, [[отредактировано]] после комментариев camh и jilles, вот две новые версии скрипта тестового сервера.

Обе версии теперь работают именно так, как и ожидалось.

версия Camh для управления трубами:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

версия Джилла для управления трубами:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

Спасибо всем за помощь.

1 голос
/ 27 ноября 2010

Как Camh & Деннис Уильямсон говорят, не ломайте трубу.

Теперь у меня есть примеры меньшего размера, прямо в командной строке:

Сервер:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

Клиент:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

Может заменить строку ключа на:

    (echo "Test-$i" > tst-fifo&);

Все данные клиента, отправленные в канал, считываются, хотя при втором варианте клиента может потребоваться несколько раз запустить сервер, прежде чем все данные будут прочитаны.

Но хотя чтение ожидает начала данных в канале, после того, как данные были переданы, оно всегда читает пустую строку.

Есть ли способ остановить это?

Спасибо за любые идеи снова.

0 голосов
/ 02 мая 2013

выполнить, скажем, не более 10 параллельных заданий и поставить остальные в очередь для последующей обработки, но точно знать, что они выполняются

Это можно сделать с помощью GNU Parallel.Вам не понадобится этот сценарий.

http://www.gnu.org/software/parallel/man.html#options

Вы можете установить max-procs "Количество рабочих мест. Запуск до N заданий параллельно".Существует возможность установить количество ядер ЦП, которые вы хотите использовать.Вы можете сохранить список выполненных заданий в файле журнала, но это бета-функция.

0 голосов
/ 27 ноября 2010

С одной стороны, проблема хуже, чем я думал: теперь в моем более сложном примере (jq_manage), похоже, есть случай, когда одни и те же данные читаются снова и снова из канала (даже если нет новых данныхзаписывается в него).

С другой стороны, я нашел простое решение (отредактированное после комментария Денниса):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

Работает как шарм.Нет розетки или трубы.Простой.

...