TypeError при разбиении подзадач в группированных задач - PullRequest
1 голос
/ 10 марта 2020

Я пытаюсь сделать ПО C для изучения Celery и как его можно использовать для оптимизации выполнения некоторых задач.

Я пытаюсь написать электронную таблицу с двумя вкладками, в которой я ' Я просто напишу квадратные и кубические значения для заданного диапазона чисел.

Я начну с вычисления значений в двух подзадачах, которые я пытаюсь выполнить в group (optimize/tasks.py:generate_output()), чтобы распараллелить его, затем я запишите его в поток, который будет использоваться для записи файла. Чтобы быть более эффективным, я также использую chunks() (optimize/tasks.py:make_worksheet_data()) в Celery для вычисления данных в подзадачах.

Я добавил несколько разборов аргументов, чтобы попробовать разные варианты: но это похоже на «подрезанные» подзадачи в сгруппированных подписях поднять TypeError:

 $ docker exec -ti optimization_worker_1 bash -c "./run_tasks.py -c 10 -m"     
Namespace(chunks=10, entries=100, multi=True)
Traceback (most recent call last):
  File "./run_tasks.py", line 37, in <module>
    result = generate_file_from_output(parsed_args)
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 191, in __call__
    return self._get_current_object()(*a, **kw)
  File "/usr/local/lib/python3.6/site-packages/celery/app/task.py", line 392, in __call__
    return self.run(*args, **kwargs)
  File "/app/tasks.py", line 81, in generate_file_from_output
    generate_output(parsed_args, output)
  File "/app/tasks.py", line 69, in generate_output
    result_get = result.get()
  File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 703, in get
    on_interval=on_interval,
  File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 830, in join_native
    raise value
TypeError: __call__() takes 1 positional argument but 2 were given

И все же я не вижу особой разницы между делами сельдерея c относительно groups и chunks,

Итак, мой вопрос: возможно ли встраивать фрагментированные подзадачи в сгруппированные задачи? Как этого добиться?

Мой заказ C :

 $ tree
.
├── Dockerfile
├── docker-compose.yml
├── optimization
│   ├── run_tasks.py
│   └── tasks.py
└── requirements.txt

1 directories, 5 files
  • Dockerfile:
FROM python:3.6.8
WORKDIR /app
ADD requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
VOLUME /app
ENTRYPOINT celery -A .tasks worker --broker="amqp://admin:mypass@rabbit" --concurrency=20 --loglevel=info
  • docker-compose.yml:
---
version: "3"
services:
  rabbit:
    image: rabbitmq:latest
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=mypass
    ports:
      - "5673:5672"
      - "15672:15672"
  worker:
    build:
      context: .
    volumes:
      - ".:/usr/src/myapp"
    links:
      - rabbit
    depends_on:
      - rabbit
    volumes:
      - "./optimization:/app"
      - "/path/to/dump:/dump"
  • optimize/run_tasks.py:
#!/usr/bin/env python3

import argparse
from tasks import generate_file_from_output
import time

if __name__ == '__main__':
    # Arguments parsing :
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-e", "--entries",
        help="set entries number",
        type=int,
        default=100,
        metavar="e",
    )
    # Multi process : one a worksheet?
    parser.add_argument("-m", "--multi", action="store_true")
    # Chunks : optimize by chunk (mutually exclusive with "multi_thread")
    parser.add_argument(
        "-c", "--chunks",
        help="perform the file generation by spliting the data to process into chunks",
        type=int,
        nargs="?",
        const=10,
        default=None,
        metavar="N",
    )
    parsed_args = parser.parse_args()
    print(parsed_args)
    result = generate_file_from_output(parsed_args)
  • optimize/tasks.py:
#!/usr/bin/env python3

from celery import Celery, group
from io import BytesIO
import time
import xlsxwriter
import argparse
import sys

# Celery related :

app = Celery('tasks', broker='amqp://admin:mypass@rabbit', backend='rpc://')

def apply_power(entry, power):
    time.sleep(0.01)
    return entry ** power

def make_data_for_entry(entries, power):
    return [(entry, apply_power(entry, power)) for entry in entries]

@app.task
def decorated_make_data_for_entry(*args):
    return make_data_for_entry(*args)

@app.task
def make_worksheet_data(entries, power, header, chunks):
    if chunks is not None:
        result = decorated_make_data_for_entry.chunks(zip(entries), chunks)(power)
        result = make_data_for_entry.chunks(zip(entries), chunks)(power)
    else:
        result = make_data_for_entry(entries, power)
    return [header] + result

@app.task
def decorated_make_worksheet_data(*args):
    return make_worksheet_data(*args)

def make_worksheet_from_data(workbook, worksheet_data, name):
    worksheet = workbook.add_worksheet(name)
    for i, row in enumerate(worksheet_data):
        for j, cell in enumerate(row):
            worksheet.write(i, j, cell)

def generate_output(parsed_args, output):
    workbook = xlsxwriter.Workbook(output)
    entries = range(parsed_args.entries)
    if parsed_args.multi is not None:
        g = group([
            decorated_make_worksheet_data.s(list(entries), 2, ["value", "square"], parsed_args.chunks),
            decorated_make_worksheet_data.s(list(entries), 3, ["value", "cube"], parsed_args.chunks),
        ])  # also tried without the []
        result = g()
        result_get = result.get()
        worksheet_square_data, worksheet_cube_data = result_get
    else:
        worksheet_square_data = make_worksheet_data(list(entries), 2, ["value", "square"], parsed_args.chunks)
        worksheet_cube_data = make_worksheet_data(list(entries), 3, ["value", "cube"], parsed_args.chunks)
    make_worksheet_from_data(workbook, worksheet_square_data, "square")
    make_worksheet_from_data(workbook, worksheet_cube_data, "cube")
    workbook.close()

@app.task
def generate_file_from_output(parsed_args):
    output = BytesIO()
    generate_output(parsed_args, output)
    with open("/dump/file.xlsx", "wb") as f:
        output.seek(0)
        f.write(output.read())

def main():
    generate_file_from_output()

if __name__ == "__main__":
    main()
  • requirements.txt:
amqp==2.5.2
billiard==3.6.3.0
celery==4.4.1
importlib-metadata==1.5.0
kombu==4.6.8
pytz==2019.3
vine==1.3.0
XlsxWriter==1.2.8
zipp==3.1.0

В рабочие журналы добавляется только имя задачи:

worker_2  | [2020-03-10 11:06:55,849: INFO/MainProcess] Received task: tasks.decorated_make_worksheet_data[5da60d3b-decb-483a-bbd2-369019540f44]
worker_2  | [2020-03-10 11:06:55,851: ERROR/ForkPoolWorker-16] Task tasks.decorated_make_worksheet_data[5da60d3b-decb-483a-bbd2-369019540f44] raised unexpected: TypeError('__call__() takes 1 positional argument but 2 were given',)
worker_2  | Traceback (most recent call last):
worker_2  |   File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
worker_2  |     R = retval = fun(*args, **kwargs)
worker_2  |   File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
worker_2  |     return self.run(*args, **kwargs)
worker_2  |   File "/app/tasks.py", line 40, in decorated_make_worksheet_data
worker_2  |     return make_worksheet_data(*args)
worker_2  |   File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 191, in __call__
worker_2  |     return self._get_current_object()(*a, **kw)
worker_2  |   File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 651, in __protected_call__
worker_2  |     return orig(self, *args, **kwargs)
worker_2  |   File "/usr/local/lib/python3.6/site-packages/celery/app/task.py", line 392, in __call__
worker_2  |     return self.run(*args, **kwargs)
worker_2  |   File "/app/tasks.py", line 32, in make_worksheet_data
worker_2  |     result = decorated_make_data_for_entry.chunks(zip(entries), chunks)(power)
worker_2  | TypeError: __call__() takes 1 positional argument but 2 were given
...