Вот моя структура каталогов:
app
|-- automate-scan.py
|-- helpers
| |-- __init__.py
| |-- __init__.pyc
| |-- tasks.py
| `-- tasks.pyc
|-- __init__.py
`-- __init__.pyc
Теперь вот что я пытаюсь запустить:
atuomate_scan.py
import os
from configobj import ConfigObj
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://user:pass@localhost/my_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('initialScanning'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
config_file = '/home/myuser/config.cfg'
config_obj = ConfigObj(config_file)
repo_list_file = config_obj.get('files').get('repo_file')
@celapp.task()
def run_scan(repo):
print repo
os.system("git clone " + repo)
repo_to_scan = repo.split('/')[1].split('.')[0]
os.system("python /home/someuser/myscript.py file:///home/someuser/repo_collection/" + repo_to_scan + " --json --regex")
@celapp.task()
def final_task():
print "hakuna matata"
хранилища - в приведенном ниже коде приведен список строк
tasks.py
import os
from configobj import ConfigObj
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
from celery import chord
from helpers import tasks
f = open('/home/someuser/repositories.lst')
os.system("mkdir /home/someuser/repo_collection")
repositories = f.read().splitlines()
f.close()
print repositories
print "\n\n"
created_task = (tasks.run_scan.subtask((repo)) for (repo) in repositories)
finaltask = tasks.final_task.subtask()
res = chord(created_task,queue='initialScanning')(finaltask)
Когда я запускаю automate-scan.py
, я получаю следующую ошибку:
Traceback (most recent call last):
File "app/automate-scan.py", line 20, in <module>
res = chord(created_task,queue='initialScanning')(finaltask)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1189, in __call__
return self.apply_async((), {'body': body} if body else {}, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1232, in apply_async
return self.run(tasks, body, args, task_id=task_id, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1277, in run
header_result = header(*partial_args, task_id=group_id, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 953, in __call__
return self.apply_async(partial_args, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 978, in apply_async
args=args, kwargs=kwargs, **options))
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 1054, in _apply_tasks
**options)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 218, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 513, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: run_scan() takes exactly 1 argument (50 given)
Я не понимаю, где я даю 50 аргументов. Пожалуйста, дайте мне знать, где я иду не так и что нужно изменить, чтобы это исправить.
Редактировать 1
Также я попытался изменить строку:
created_task = (tasks.run_scan.subtask((repo)) for (repo) in repositories)
до
created_task = (tasks.run_scan.subtask(repo) for repo in repositories)
также. Но те же результаты