Я пытаюсь сделать ПО C для изучения Celery и как его можно использовать для оптимизации выполнения некоторых задач.
Я пытаюсь написать электронную таблицу с двумя вкладками, в которой я ' Я просто напишу квадратные и кубические значения для заданного диапазона чисел.
Я начну с вычисления значений в двух подзадачах, которые я пытаюсь выполнить в group
(optimize/tasks.py:generate_output()
), чтобы распараллелить его, затем я запишите его в поток, который будет использоваться для записи файла. Чтобы быть более эффективным, я также использую chunks()
(optimize/tasks.py:make_worksheet_data()
) в Celery для вычисления данных в подзадачах.
Я добавил несколько разборов аргументов, чтобы попробовать разные варианты: но это похоже на «подрезанные» подзадачи в сгруппированных подписях поднять TypeError
:
$ docker exec -ti optimization_worker_1 bash -c "./run_tasks.py -c 10 -m"
Namespace(chunks=10, entries=100, multi=True)
Traceback (most recent call last):
File "./run_tasks.py", line 37, in <module>
result = generate_file_from_output(parsed_args)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 191, in __call__
return self._get_current_object()(*a, **kw)
File "/usr/local/lib/python3.6/site-packages/celery/app/task.py", line 392, in __call__
return self.run(*args, **kwargs)
File "/app/tasks.py", line 81, in generate_file_from_output
generate_output(parsed_args, output)
File "/app/tasks.py", line 69, in generate_output
result_get = result.get()
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 703, in get
on_interval=on_interval,
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 830, in join_native
raise value
TypeError: __call__() takes 1 positional argument but 2 were given
И все же я не вижу особой разницы между делами сельдерея c относительно groups
и chunks
,
Итак, мой вопрос: возможно ли встраивать фрагментированные подзадачи в сгруппированные задачи? Как этого добиться?
Мой заказ C :
$ tree
.
├── Dockerfile
├── docker-compose.yml
├── optimization
│ ├── run_tasks.py
│ └── tasks.py
└── requirements.txt
1 directories, 5 files
FROM python:3.6.8
WORKDIR /app
ADD requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
VOLUME /app
ENTRYPOINT celery -A .tasks worker --broker="amqp://admin:mypass@rabbit" --concurrency=20 --loglevel=info
---
version: "3"
services:
rabbit:
image: rabbitmq:latest
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=mypass
ports:
- "5673:5672"
- "15672:15672"
worker:
build:
context: .
volumes:
- ".:/usr/src/myapp"
links:
- rabbit
depends_on:
- rabbit
volumes:
- "./optimization:/app"
- "/path/to/dump:/dump"
#!/usr/bin/env python3
import argparse
from tasks import generate_file_from_output
import time
if __name__ == '__main__':
# Arguments parsing :
parser = argparse.ArgumentParser()
parser.add_argument(
"-e", "--entries",
help="set entries number",
type=int,
default=100,
metavar="e",
)
# Multi process : one a worksheet?
parser.add_argument("-m", "--multi", action="store_true")
# Chunks : optimize by chunk (mutually exclusive with "multi_thread")
parser.add_argument(
"-c", "--chunks",
help="perform the file generation by spliting the data to process into chunks",
type=int,
nargs="?",
const=10,
default=None,
metavar="N",
)
parsed_args = parser.parse_args()
print(parsed_args)
result = generate_file_from_output(parsed_args)
#!/usr/bin/env python3
from celery import Celery, group
from io import BytesIO
import time
import xlsxwriter
import argparse
import sys
# Celery related :
app = Celery('tasks', broker='amqp://admin:mypass@rabbit', backend='rpc://')
def apply_power(entry, power):
time.sleep(0.01)
return entry ** power
def make_data_for_entry(entries, power):
return [(entry, apply_power(entry, power)) for entry in entries]
@app.task
def decorated_make_data_for_entry(*args):
return make_data_for_entry(*args)
@app.task
def make_worksheet_data(entries, power, header, chunks):
if chunks is not None:
result = decorated_make_data_for_entry.chunks(zip(entries), chunks)(power)
result = make_data_for_entry.chunks(zip(entries), chunks)(power)
else:
result = make_data_for_entry(entries, power)
return [header] + result
@app.task
def decorated_make_worksheet_data(*args):
return make_worksheet_data(*args)
def make_worksheet_from_data(workbook, worksheet_data, name):
worksheet = workbook.add_worksheet(name)
for i, row in enumerate(worksheet_data):
for j, cell in enumerate(row):
worksheet.write(i, j, cell)
def generate_output(parsed_args, output):
workbook = xlsxwriter.Workbook(output)
entries = range(parsed_args.entries)
if parsed_args.multi is not None:
g = group([
decorated_make_worksheet_data.s(list(entries), 2, ["value", "square"], parsed_args.chunks),
decorated_make_worksheet_data.s(list(entries), 3, ["value", "cube"], parsed_args.chunks),
]) # also tried without the []
result = g()
result_get = result.get()
worksheet_square_data, worksheet_cube_data = result_get
else:
worksheet_square_data = make_worksheet_data(list(entries), 2, ["value", "square"], parsed_args.chunks)
worksheet_cube_data = make_worksheet_data(list(entries), 3, ["value", "cube"], parsed_args.chunks)
make_worksheet_from_data(workbook, worksheet_square_data, "square")
make_worksheet_from_data(workbook, worksheet_cube_data, "cube")
workbook.close()
@app.task
def generate_file_from_output(parsed_args):
output = BytesIO()
generate_output(parsed_args, output)
with open("/dump/file.xlsx", "wb") as f:
output.seek(0)
f.write(output.read())
def main():
generate_file_from_output()
if __name__ == "__main__":
main()
amqp==2.5.2
billiard==3.6.3.0
celery==4.4.1
importlib-metadata==1.5.0
kombu==4.6.8
pytz==2019.3
vine==1.3.0
XlsxWriter==1.2.8
zipp==3.1.0
В рабочие журналы добавляется только имя задачи:
worker_2 | [2020-03-10 11:06:55,849: INFO/MainProcess] Received task: tasks.decorated_make_worksheet_data[5da60d3b-decb-483a-bbd2-369019540f44]
worker_2 | [2020-03-10 11:06:55,851: ERROR/ForkPoolWorker-16] Task tasks.decorated_make_worksheet_data[5da60d3b-decb-483a-bbd2-369019540f44] raised unexpected: TypeError('__call__() takes 1 positional argument but 2 were given',)
worker_2 | Traceback (most recent call last):
worker_2 | File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
worker_2 | R = retval = fun(*args, **kwargs)
worker_2 | File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
worker_2 | return self.run(*args, **kwargs)
worker_2 | File "/app/tasks.py", line 40, in decorated_make_worksheet_data
worker_2 | return make_worksheet_data(*args)
worker_2 | File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 191, in __call__
worker_2 | return self._get_current_object()(*a, **kw)
worker_2 | File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 651, in __protected_call__
worker_2 | return orig(self, *args, **kwargs)
worker_2 | File "/usr/local/lib/python3.6/site-packages/celery/app/task.py", line 392, in __call__
worker_2 | return self.run(*args, **kwargs)
worker_2 | File "/app/tasks.py", line 32, in make_worksheet_data
worker_2 | result = decorated_make_data_for_entry.chunks(zip(entries), chunks)(power)
worker_2 | TypeError: __call__() takes 1 positional argument but 2 were given