Как интегрировать сельдерей в flask + redis? - PullRequest
2 голосов
/ 20 июня 2020

Я недавно загрузил приложение docker и начал его модифицировать. Я пытаюсь реализовать некоторые многопоточные функции, но они задерживают инициализацию сервера. Я поспрашивал, и мне кажется, что сельдерей может быть моим лучшим вариантом, так как я могу установить очередь и заставить его выполнять мои функции, не нарушая само приложение.

Я изменил bootstrap / Firehouse.py для запуска сельдерея функция, поскольку она вызывается после инициализации сервера. Итак, теперь Firehouse.py выглядит так:

import datetime
import json

from flask.json import jsonify
from sqlalchemy import desc

import cache
from shared import app, db
from model.Board import Board
from model.BoardListCatalog import BoardCatalog
from model.Post import render_for_catalog
from model.Thread import Thread
from model.ThreadPosts import _datetime_handler



class Firehose:
    def get(self):
        return jsonify(self._get_threads())

    def get_impl(self):
        threads = self._get_threads()
        for thread in threads:
            board_id = thread["board"]
            board = db.session.query(Board).get(board_id)
            thread["board"] = board.name
        render_for_catalog(threads)
        return threads

    def _get_threads(self):
        firehose_cache_key = "firehose-threads"
        cache_connection = cache.Cache()
        cached_threads = cache_connection.get(firehose_cache_key)
        if cached_threads:
            deserialized_threads = json.loads(cached_threads)
            for thread in deserialized_threads:
                thread["last_updated"] = datetime.datetime.utcfromtimestamp(thread["last_updated"])
            return deserialized_threads
        firehose_limit = app.config["FIREHOSE_LENGTH"]
        raw_threads = db.session.query(Thread).order_by(desc(Thread.last_updated)).limit(firehose_limit).all()
        threads = BoardCatalog()._to_json(raw_threads)
        for thread in threads:
            db_thread = db.session.query(Thread).get(thread["id"])
            thread["board"] = db_thread.board
        cache_friendly = json.dumps(threads, default=_datetime_handler)
        cache_connection.set(firehose_cache_key, cache_friendly)
        return threads

from bootstrap import runMonitor
runMonitor.delay()

shared.py также был изменен, чтобы указывать на настройку URL-адреса сельдерея. Теперь shared.py выглядит так:

import os
import random
import ipaddress
from customjsonencoder import CustomJSONEncoder
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from flask_restful import Api
from werkzeug.datastructures import ImmutableDict
from celery import Celery
import jinja_cache


class ManiwaniApp(Flask):
    jinja_options = ImmutableDict(extensions=["jinja2.ext.autoescape", "jinja2.ext.with_"],
                                  bytecode_cache=jinja_cache.KeystoreCache())


app = ManiwaniApp(__name__, static_url_path='')
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["UPLOAD_FOLDER"] = "~/maniwani/uploads"
app.config["THUMB_FOLDER"] = os.path.join(app.config["UPLOAD_FOLDER"], "thumbs")
app.config["SERVE_STATIC"] = True
app.config["SERVE_REST"] = True
app.config["USE_RECAPTCHA"] = False
app.config["FIREHOSE_LENGTH"] = 10
app.config['CELERY_BROKER_URL'] = 'redis://redis:6397/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://redis:6397/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)


if os.getenv("MANIWANI_CFG"):
    app.config.from_envvar("MANIWANI_CFG")
app.url_map.strict_slashes = False
app.json_encoder = CustomJSONEncoder
db = SQLAlchemy(app)
migrate = Migrate(app, db)
rest_api = Api(app)

SECRET_FILE = "./deploy-configs/secret"
def get_secret():
    return open(SECRET_FILE).read()

if os.path.exists(SECRET_FILE):
    app.secret_key = get_secret()


def gen_poster_id():
    return '%04X' % random.randint(0, 0xffff)


def ip_to_int(ip_str):
    # The old version of ip_to_int had a logical bug where it would always shift the
    # final result to the left by 8. This is preserved with the `<< 8`.
    return int.from_bytes(
        ipaddress.ip_address(ip_str).packed,
        byteorder="little"
    ) << 8

Наконец bootstrap .py был изменен и включает в себя мой собственный написанный сценарий, код для сценария не важен, но я думаю, что актуальным является celery.task, который был определен, который я включу ниже:

@celery.task
def runMonitor():
    # script that does stuff

Однако проблема в том, что я не могу подключиться к серверу Redis. В docker он определен как имеющий хост 'redis', и этот должен позволять ему указывать на любой IP-адрес, сгенерированный для docker при инициализации сервера.

Может ли кто-нибудь помочь мне выяснить: а) где лучше всего вызвать runMonitor.delay () б) как подключиться к серверу Redis (в настоящее время соединения отклоняются, но сервер находится в сети и порт открыт) * ​​1020 *

ошибка, которую я получаю:

maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 559, in connect
maniwani_1    |     sock = self._connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 615, in _connect
maniwani_1    |     raise err
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 603, in _connect
maniwani_1    |     sock.connect(socket_address)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/gevent/_socket3.py", line 400, in connect
maniwani_1    |     raise error(err, strerror(err))
maniwani_1    | ConnectionRefusedError: [Errno 111] Connection refused
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 127, in reconnect_on_error
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 3580, in subscribe
maniwani_1    |     ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 3466, in execute_command
maniwani_1    |     self.connection = self.connection_pool.get_connection(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | redis.exceptions.ConnectionError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 559, in connect
maniwani_1    |     sock = self._connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 615, in _connect
maniwani_1    |     raise err
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 603, in _connect
maniwani_1    |     sock.connect(socket_address)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/gevent/_socket3.py", line 400, in connect
maniwani_1    |     raise error(err, strerror(err))
maniwani_1    | ConnectionRefusedError: [Errno 111] Connection refused
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 779, in send_task
maniwani_1    |     self.backend.on_task_call(P, task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 344, in on_task_call
maniwani_1    |     self.result_consumer.consume_from(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 169, in consume_from
maniwani_1    |     return self.start(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 147, in start
maniwani_1    |     self._consume_from(initial_task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 130, in reconnect_on_error
maniwani_1    |     self._ensure(self._reconnect_pubsub, ())
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 355, in ensure
maniwani_1    |     return retry_over_time(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/utils/functional.py", line 344, in retry_over_time
maniwani_1    |     return fun(*args, **kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 115, in _reconnect_pubsub
maniwani_1    |     metas = self.backend.client.mget(self.subscribed_to)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 1671, in mget
maniwani_1    |     return self.execute_command('MGET', *args, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 898, in execute_command
maniwani_1    |     conn = self.connection or pool.get_connection(command_name, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | redis.exceptions.ConnectionError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    |
maniwani_1    | During handling of the above exception, another exception occurred:
maniwani_1    |
maniwani_1    | Traceback (most recent call last):
maniwani_1    |   File "app.py", line 9, in <module>
maniwani_1    |     from blueprints.main import main_blueprint
maniwani_1    |   File "./blueprints/main.py", line 9, in <module>
maniwani_1    |     from model.Firehose import Firehose
maniwani_1    |   File "./model/Firehose.py", line 50, in <module>
maniwani_1    |     runMonitor.delay()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 425, in delay
maniwani_1    |     return self.apply_async(args, kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 565, in apply_async
maniwani_1    |     return app.send_task(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 780, in send_task
maniwani_1    |     amqp.send_task_message(P, name, message, **options)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 455, in _reraise_as_library_errors
maniwani_1    |     reraise(ConnectionError, ConnectionError(text_t(exc)),
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/vine/five.py", line 194, in reraise
maniwani_1    |     raise value.with_traceback(tb)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 451, in _reraise_as_library_errors
maniwani_1    |     yield
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/app/base.py", line 779, in send_task
maniwani_1    |     self.backend.on_task_call(P, task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 344, in on_task_call
maniwani_1    |     self.result_consumer.consume_from(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 169, in consume_from
maniwani_1    |     return self.start(task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 147, in start
maniwani_1    |     self._consume_from(initial_task_id)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 177, in _consume_from
maniwani_1    |     self._pubsub.subscribe(key)
maniwani_1    |   File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__
maniwani_1    |     self.gen.throw(type, value, traceback)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 130, in reconnect_on_error
maniwani_1    |     self._ensure(self._reconnect_pubsub, ())
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 355, in ensure
maniwani_1    |     return retry_over_time(
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/kombu/utils/functional.py", line 344, in retry_over_time
maniwani_1    |     return fun(*args, **kwargs)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/celery/backends/redis.py", line 115, in _reconnect_pubsub
maniwani_1    |     metas = self.backend.client.mget(self.subscribed_to)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 1671, in mget
maniwani_1    |     return self.execute_command('MGET', *args, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/client.py", line 898, in execute_command
maniwani_1    |     conn = self.connection or pool.get_connection(command_name, **options)
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 1192, in get_connection
maniwani_1    |     connection.connect()
maniwani_1    |   File "/usr/local/lib/python3.8/dist-packages/redis/connection.py", line 563, in connect
maniwani_1    |     raise ConnectionError(self._error_message(e))
maniwani_1    | kombu.exceptions.OperationalError: Error 111 connecting to redis:6397. Connection refused.
maniwani_1    | unable to load app 0 (mountpoint='') (callable not found or import error)
maniwani_1    | *** no app loaded. going in full dynamic mode ***
maniwani_1    | *** uWSGI is running in multiple interpreter mode ***

redis запущен и работает:

sudo docker container ls
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS              PORTS                NAMES
d4614c9b1013        nginx                 "/docker-entrypoint.…"   25 seconds ago      Up 16 seconds       0.0.0.0:80->80/tcp   maniwani_nginx_1
95503d1e7a28        minio/minio           "/usr/bin/docker-ent…"   28 seconds ago      Up 23 seconds       9000/tcp             maniwani_minio_1
0da8e7a9a573        postgres              "docker-entrypoint.s…"   30 seconds ago      Up 25 seconds       5432/tcp             maniwani_postgres_1
143064a00cb8        maniwani_maniwani     "sh ./docker-entrypo…"   30 seconds ago      Up 24 seconds       3031/tcp, 5000/tcp   maniwani_maniwani_1
5b51b76c4471        redis                 "docker-entrypoint.s…"   30 seconds ago      Up 26 seconds       6379/tcp             maniwani_redis_1
54d637f6afe1        maniwani_captchouli   "sh ./entrypoint.sh"     30 seconds ago      Up 23 seconds       8512/tcp             maniwani_captchouli_1
...