Может ли сельдерей обновить параметр соединения amqp во время выполнения? - PullRequest
0 голосов
/ 05 марта 2019

Я пытаюсь настроить брокера сельдерея с облачным AMQP.

Поскольку облачный сервис AMQP предоставляет только Java SDK, поэтому я переписал код шифрования с помощью Python, и соединение работает нормально.

Однако существует проблема с отправкой производителя: соединение с облачным сервисом AMQPбудет недействительным после запуска моего проекта некоторое время, потому что Celery amqp pruducer / connection не может обновить параметр соединения.Ошибка 530 Time Expired., Что означает неверный пароль

Вот мой конфиг сельдерея:

task_ignore_result=True,
task_default_queue='default',
task_default_exchange='default',
result_exchange='default',
task_default_exchange_type='direct',
broker_login_method='PLAIN',
task_create_missing_queues=True,
task_serializer='json',
result_serializer='json',
result_expire=1,
accept_content=['json'],
broker_connection_retry=False,
task_queues=(
    Queue(name='tesu', exchange=Exchange(name='test', type='direct'), routing_key='test'),
),
task_routes=(
    {'tasks.add': {
        'queue': 'test_lukou',
        'routing_key': 'test_lukou'
    }},
),
broker_url='amqp://{username}:{password}@{host}:{port}/{virtual_host}'.format(username=provider.get_user_name(),
                                                                              password=provider.get_password(),
                                                                              host=PUBLIC_HOST,
                                                                              port=PORT,
                                                                              virtual_host=VHOST_NAME),
broker_pool_limit=0,
broker_heartbeat=10,
broker_connection_timeout=30, 
result_backend=None, 
event_queue_expires=60,  
worker_prefetch_multiplier=1,

Я обновил broker_url при отправке задачи, но параметр соединения amqp НЕ обновляется.

окружение:
Python 2.7 kombu 4.0.2 celery 4.1.0 rabbitmq 0.2.0

Предоставляет ли Celery способ обновления параметров соединения amqp во время выполнения?
Может кто-нибудь дать мне совет?Заранее спасибо ..

некоторые ссылки:

Сельдерей создает новое соединение для каждой задачи

https://www.cloudamqp.com/docs/celery.html

addion:
Результат отладки
Пароль подключения amqp (никогда не менялся)
обновленный celery conf

1 Ответ

0 голосов
/ 05 марта 2019

Sloved

Установите Ceq amqp, создав новый пул производителей для каждой задачи, например:

class TestAMQP(AMQP):
    @property
    def producer_pool(self):
        self._producer_pool = pools.producers[
            self.app.connection_for_write()]
        self._producer_pool.limit = self.app.pool.limit
        return self._producer_pool

app = Celery('test', include=['tasks'], amqp=TestAMQP)

Надеюсь, что это может помочь кому-то, кто столкнулся с подобной проблемой.

...