Dockerised Python Celery не регистрируется в настроенном формате - PullRequest
0 голосов
/ 21 июня 2019

У меня есть особая проблема, когда мой проект на python выполняет задачу celery.Я попытался celery.signals и worker_hijack_root_logger = False , чтобы настроить свои собственные журналы, но, похоже, ничего не работает.Если я смогу отформатировать его, то он, кажется, иногда печатает и пропускает журналы в другой раз.Это заставило меня задуматься, не хватает ли я чего-то для успешной заготовки сельдерея.

Сценарий :

Формат, в котором я хочу напечатать, -

Format=%(version:1.0.0)s %(timestamp:asctime)s %(severity:levelname)s %(service_id:tosca-o)s %(message:message)s

Для этого я создал customClass для переопределения pythonjsonformatter -

from datetime import datetime
from pythonjsonlogger import jsonlogger

class JsonFormatter(jsonlogger.JsonFormatter):

    def add_fields(self, log_record, record, message_dict):
        """
        Override this method to implement custom logic for adding fields.
        """
        record.asctime = self.formatTime(record, self.datefmt)
        for field in self._required_fields:
            key = field.split(":")[0]
            value = field.split(":")[1]
            if record.__dict__.get(value):
                log_record[key] = record.__dict__.get(value)
            else:
                log_record[key] = value
        log_record.update(message_dict)
        jsonlogger.merge_record_extra(record, log_record, reserved=self._skip_fields)

        if self.timestamp:
            key = self.timestamp if isinstance(self.timestamp, str) else 'timestamp'
            log_record[key] = datetime.utcnow()

Мой logging.conf файл выглядит так -

[loggers]
keys=root

[handlers]
keys=consoleHandler

[formatters]
keys=customFormatter

[logger_root]
level=DEBUG
handlers=consoleHandler

[handler_consoleHandler]
level=DEBUG
class=StreamHandler
formatter=customFormatter
args=(sys.stdout,)

[formatter_customFormatter]
format=%(version:1.0.0)s %(timestamp:asctime)s %(severity:levelname)s %(service_id:my-service)s %(message:message)s
class=test.myJsonFormatter.JsonFormatter

У меня есть __ init __. Py , где проект инициализирован ирегистратор установлен -

   from flask_httpauth import HTTPBasicAuth
    from flask_restful import Resource, Api, reqparse, fields
    from flask_restful_swagger import swagger
    import celery.events.state
    from celery import Celery
    from celery.app.log import TaskFormatter
    from celery.signals import after_setup_task_logger
    from my.ModelClasses import myCommandModel, myModel, myRequestResultModel, myExtraArgsModel

    #Set the queue for celery
    io_q = Queue()

    app = Flask(__name__)
    auth = HTTPBasicAuth()

    this_path = sys.path[0]

    config = SafeConfigParser()
    #contains configurations for celery, redis and flask
    config.read('config.ini')


    #Configure Logging to sys.stdout
    logging.config.fileConfig('my/logging.conf')
    logger = logging.getLogger(__name__)


    app.config['broker_url'] = config.get("Default", "CELERY_BROKER_URL")
    app.config['result_backend'] = config.get("Default", "CELERY_RESULT_BACKEND")
    str_task_timeout = config.get("Default", "CELERY_TASK_TIMEOUT")
    my_root = config.get("Default", "my_root")
    my_filter = config.get("Default", "my_filter")
    task_timeout = int(str_task_timeout)

    api = swagger.docs(Api(app), apiVersion='1.0')

    celery = Celery(app.name, broker=app.config['broker_url'], backend=app.config['result_backend'])
    celery.control.time_limit('do_long_running_task', soft=900, hard=900, reply=True)
    celery.conf.update(app.config) 

Затем у меня есть celery_task.py , где я хочу регистрировать выполненную задачу -

import subprocess
import datetime, time, json
from subprocess import Popen, PIPE
from myapp import api, app, celery, task_timeout, logger

@celery.task(bind=True, soft_time_limit=task_timeout, time_limit=(task_timeout+10))
def do_long_running_task(self, cmd, type='myapp'):
    with app.app_context():

        output = ""
        self.update_state(state='IN_PROGRESS',
                          meta={'output': output,
                                'description': "",
                                'returncode': None})
        print(str.format("About to execute: {0}", cmd))
        proc = Popen([cmd], stdout=PIPE, stderr=subprocess.STDOUT, shell=True)
        logger.info("About to execute %s",cmd)
        for line in iter(proc.stdout.readline, b''):
            print(str(line))                
            output = output + line.decode("ascii")
            self.update_state(state='IN_PROGRESS', meta={'output': output,'description': "Task is still executing",'returncode': None})

        return_code = proc.poll()
        if return_code is 0:
            meta = {'output': output,
                        'returncode': proc.returncode,
                        'description': "Task is complete"
                    }
            self.update_state(state='COMPLETED',
                              meta=meta)
        elif return_code is not 0:
            #failure
            meta = {'output': output,
                        'returncode': return_code,
                        'description': str.format("Celery ran the task, but {0} has reported with error", type)
                    }
            self.update_state(state='FAILED',
                          meta=meta)
        if len(output) is 0:
            output = "no output"
            meta = {'output': output,
                        'returncode': return_code,
                        'description': str.format("Celery ran the task, but {0} has reported with error", type)
                    }
        return meta

Печать 1031 * cmd выше печатает журналы, но не в моем собственном формате, даже если я заменяю его на logger объект, импортированный из myapp , где я настроил logger какОбязательно.

Я пытался использовать celery.signals и угнать рут, но не смог по-прежнему печатать в моем собственном формате.

Я выполняю эту закрепленную службу как -

services:

  myService:
    image: my-service
    ports:
      - '8000:8000'
    volumes:
      - ./vol/data:/data
    restart: always
    depends_on:
      - "celery"

  celery:
    image: my-service
    command: /usr/bin/celery worker -A myapp.celery --loglevel=info
    volumes:
      - ./vol/data:/data
    restart: always
    depends_on:
      - "redis"

  redis:
    image: 'redis:alpine'
    command: redis-server --appendonly yes
    volumes:
      - 'redis:/data'
    restart: always
    ports:
      - '6379:6379'

  node_alpinex:
    image: node_alpinex
    ports:
      - '4000:22'

volumes:
  redis:

Итак, команда для выполнения сельдерея - -

"Рабочий из сельдерея -A myapp.celery --loglevel =info "

Пример печати

  celery_1        | [2019-06-21 12:27:18,970: WARNING/ForkPoolWorker-2] b'PLAY [node_alpinex] ************************************************************\n'
    celery_1        | [2019-06-21 12:27:18,976: WARNING/ForkPoolWorker-2] b'META: ran handlers\n'
    celery_1        | [2019-06-21 12:27:18,994: WARNING/ForkPoolWorker-2] b'\n'
    celery_1        | [2019-06-21 12:27:18,997: WARNING/ForkPoolWorker-2] b'TASK [Bootstrap a host without python3 installed] ******************************\n'
    celery_1        | [2019-06-21 12:27:18,998: WARNING/ForkPoolWorker-2] b'task path: /usr/src/data/test4.yml:8\n'

Я хочу выше печатать в моем формате.Может кто-нибудь помочь?

Структура файла:

app
 |--myapp
     |--__init__.py
     |--celery_task.py
     |--logging.conf
     |-- etc etc
 |--config.ini 
...