Python: обработка списка аргументов на нескольких серверах с помощью сельдерея - PullRequest
0 голосов
/ 08 мая 2018

Я пытаюсь использовать Celery для обработки списка входных данных. Я хотел бы обработать каждый вход только один раз. Проблема в том, что все мои серверы являются частью кластера суперкомпьютеров. Я могу отправить каждому серверу одну команду для запуска процесса. Как только этот сервер запланирован для выполнения работы для моего имени пользователя (что произойдет в какое-то случайное время в будущем), он запустит этот процесс (поэтому количество серверов, работающих в любой момент времени, не определено). Я бы хотел, чтобы все серверы, выполняющие работу для моего имени пользователя, делились доступной работой, пока вся необходимая работа не будет выполнена.

Я не совсем понимаю, как именно это организовать.

Вот мой app.py, в котором описывается задача, которую должны использовать серверы:

from celery import Celery

app = Celery('tasks',
  backend='redis://localhost:6379/0',
  broker='redis://localhost:6379/0')

@app.task
def add(x, y):
  with open('results.txt', 'a') as out:
    out.write(str(x + y) + '\n')

Вот скрипт, который планирует работу (worker.py):

'''Worker node; executes tasks outlined in app.py'''
from app import add

# run the add function and pass in arguments
for i in range(10000):
  result = add.apply_async(args=[1,i]).get()

На моем локальном компьютере, если я запускаю celery worker -l info -A app в терминале, это запустит приложение сельдерея. Если я затем запускаю worker.py, я вижу, как работа выполняется.

Как я могу заставить несколько разных хостов потреблять незавершенные задачи? Каждый сервер будет иметь доступ к статическому IP-адресу, на котором будет запускаться Redis. Передать ли команду celery worker -l info -A app каждому хосту? Если да, то будет ли каждый хост магическим образом поглощать незавершенную работу, когда она появится в сети? Я был бы чрезвычайно благодарен за любую помощь, которую другие могут предложить с этими вопросами высокого уровня!

1 Ответ

0 голосов
/ 08 мая 2018

Чтобы ответить на вопрос выше, я создал файл с именем app.py и загрузил его в интерфейсный узел, к которому я мог подключиться по ssh. Этот файл описывает функцию, которую будут обрабатывать отдельные работники на разных серверах:

from celery import Celery

app = Celery('tasks',
  backend='redis://daccssfe.crc.nd.edu:6379/0',
  broker='redis://daccssfe.crc.nd.edu:6379/0')

@app.task
def log(*args):

  # have all workers write their results to a common outfile
  with open('/scratch365/dduhaime/celery-test.txt', 'a') as out:
    out.write('-'.join([str(i).strip() for i in args]) + '\n')

Далее я определил функцию schedule_work.py, которая планирует выполняемую работу:

'''Worker node; executes tasks outlined in app.py'''
from app import log

# run the add function and pass in arguments
for i in range(10000):
  print('* processing', i)

  result = log.apply_async(args=[str(i)]).get()

Этот файл создает 10 000 единиц работы и передает каждое целое число 0: 10000-1 в рабочую очередь. Когда работники выходят в сеть, они будут обрабатывать эту очередь.

Чтобы добавить рабочих, я использовал суперкомпьютерную систему моего университета, чтобы создать 10 рабочих, каждый из которых запускает файл app.py, который заставит рабочего потреблять работу из стека. Для этого с помощью системы очередей Sun Grid Engine (суперкомпьютер, над которым я работаю, использует его в качестве протокола отправки задания), я сохранил следующее в файле start_workers.sh:

#!/bin/bash
#$ -N celery
#$ -o logs/celery.log
#$ -t 1-10:1
#$ -pe smp 4
#$ -q long
#$ -r y
source ~/.bash_profile
source celery-env/bin/activate
# add a new worker
celery worker -l info -A app

Затем я отправил эти задания (qsub start_workers.sh), в которых было занято 10 работников, каждый из которых вытягивал из списка выполняемых работ. В конце они все записали адрес своего хоста и аргумент из списка работ, которые необходимо выполнить, в запрошенный файл, к которому у них всех был доступ. Как видно из файла результатов, разные хосты из набора из 10 рабочих хостов потребляли разные входные данные:

# /scratch365/dduhaime/celery-test.txt content
10.32.77.210-0
10.32.77.210-1
10.32.77.132-2
10.32.77.210-3
10.32.77.142-4
10.32.77.132-5
10.32.77.210-6
10.32.77.192-7
10.32.77.116-8
10.32.77.142-9
10.32.77.132-10
...
...