CeleryBeat: возможно ли создавать сообщение для Кафки при каждом ударе? - PullRequest
0 голосов
/ 03 октября 2019

Мне бы хотелось иметь планировщик, который позволял бы мне создавать сообщения на темы AVRO через версию Confluent Kafka, и я решил попробовать CeleryBeat.

Я использую 'v1.2' из'https://github.com/confluentinc/confluent-kafka-python' и' Redis 'в качестве брокера для Celery.

Я не уверен, выполнимо ли то, чего я пытаюсь достичь, или нет. Идея состоит в том, чтобы использовать CeleryBeat, чтобы работники периодически создавали сообщения на заданную тему Kafka AVRO. Однако по какой-то причине кажется, что производитель не может отправить сообщения.

У меня есть проект со следующей структурой:

beat_scheduler
  -scheduler
    -__init__.py
    -celery.py
    -celeryconfig.py
    -tasks.py 
#celeryconfig.py

broker_url = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Dublin'
enable_utc = True
#celery.py
from celery import Celery
app = Celery('scheduler',
             include=['scheduler.tasks'])
app.config_from_object('scheduler.celeryconfig')



app.conf.beat_schedule = {
    "see-you-in-ten-seconds-task": {
        "task": "scheduler.tasks.check",
        "schedule": 10.0,
        "kwargs":{"test_name":"wfbss"}
    }
}
#tasks.py

from scheduler.celery import app
from confluent_kafka.avro import AvroProducer
from avro import schema
import sys, requests,os
from datetime import datetime, timezone

default_key_value = {"service_name": os.environ['service_name']}


def delivery_callback(err, msg):
    if err:
        sys.stderr.write('%% Message failed delivery: %s\n' % err)
    else:
        sys.stderr.write('%% Message delivered to %s [%d]\n' % \
                         (msg.topic(), msg.partition()))


def find_latest_schema(topic,SCHEMA_REGISTRY_URL):
    subject = topic
    versions_response = requests.get(
        url="{}/subjects/{}/versions".format(SCHEMA_REGISTRY_URL, subject),
        headers={
            "Content-Type": "application/vnd.schemaregistry.v1+json",
        },
    )
    latest_version = versions_response.json()[-1]
    schema_response = requests.get(
        url="{}/subjects/{}/versions/{}".format(SCHEMA_REGISTRY_URL, subject, latest_version),
        headers={
            "Content-Type": "application/vnd.schemaregistry.v1+json",
        },
    )
    schema_response_json = schema_response.json()

    return schema_response_json["id"], schema.Parse(schema_response_json["schema"])


class Producer:
    """
            Avro Producer
            It needs the following kargs
                - bootstrap_servers
                - schema_reqistry_url
                - topic


    """

    def __init__(self, *args, **kwargs):

        self.__servers = kwargs.pop('bootstrap_servers', None)
        self.__registry = kwargs.pop('schema_reqistry_url', None)
        self.topic = kwargs.pop('topic', None)
        self.security_protocol = kwargs.pop('security_protocol', 'plaintext')
        self.ssl_ca_location = kwargs.pop('ssl_ca_location','./configuration/cacert.pem')

        self.debug_level = kwargs.pop('debug_level', 'security')
        if not self.topic:
            sys.stderr.write('%% Topic name not specified: \n')
            raise ValueError("Topic name not specified")

        schema_id, default_value_schema = find_latest_schema(self.topic+"-value",self.__registry)
        schema_id, default_key_schema = find_latest_schema(self.topic+"-key",self.__registry)


        self.default_value_schema = default_value_schema
        self.producer = AvroProducer({'bootstrap.servers': self.__servers,
                                      'schema.registry.url': self.__registry,
                                      'log.connection.close':'false',
                                      'security.protocol': self.security_protocol,
                                      'ssl.ca.location': self.ssl_ca_location
                                      },
                                     default_key_schema=default_key_schema, default_value_schema=default_value_schema)

    def produce_message(self, **kwargs):

        """
            Sends message to kafka by encoding with specified avro schema
                @:param: topic: topic name
                @:param: value: An object to serialize
                @:param: key: An object to serialize
        """

        # get schemas from  kwargs if defined

        callback_function = kwargs.pop('delivery_callback', delivery_callback)
        value = kwargs.pop('value', None)
        key = kwargs.pop('key', None)
        try:
            self.producer.produce(topic=self.topic, value=value, key=key, callback=callback_function)
            print("so far so good ")

        except Exception as e:
            # I don't have any errors
            print("ERROR:",e)
            return None
        self.producer.poll(0)
        print("AFTER POLL")
        self.producer.flush()
        print("AFTER FLUSH ")
        return {"topic": self.topic, "value": value, "sent": True}




my_producer = Producer(
            bootstrap_servers=os.environ.get('brokers'),
            schema_reqistry_url=os.environ.get('schema_registry'),
            topic='my-test-topic',
            security_protocol=os.environ.get('security_protocol', 'plaintext'))


def request_saas_test_to_start(test_name):
    body = {
        "cloud": "vcloud",
        "timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
        "scheduled_test_to_run": test_name
    }
    print("Sending :", body)
    my_producer.produce_message(value=body, key=default_key_value)

@app.task
def check(**kwargs):
    print("I am checking you stuff:",kwargs)
    request_saas_test_to_start(test_name='wfbss')


Поэтому я сначала запускаю ритм сельдерея celery -A scheduler beat --loglevel=info

который, кажется, выполняет работу каждые 10 секунд

[2019-10-03 16:10:08,513: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
[2019-10-03 16:10:18,504: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
[2019-10-03 16:10:28,504: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
[2019-10-03 16:10:38,504: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)

, а затем я запускаю работника

celery -A scheduler worker --loglevel=info

, который, кажется, не в состоянии выдать сообщение длязаданная тема

[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] so far so good
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] AFTER POLL
[2019-10-03 16:17:44,329: WARNING/ForkPoolWorker-6] so far so good
[2019-10-03 16:17:44,330: WARNING/ForkPoolWorker-6] AFTER POLL
[2019-10-03 16:17:44,343: WARNING/ForkPoolWorker-1] so far so good
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-1] AFTER POLL
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-8] so far so good
[2019-10-03 16:17:44,345: WARNING/ForkPoolWorker-8] AFTER POLL
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-3] so far so good
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-4] so far so good
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-3] AFTER POLL
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-4] AFTER POLL
[2019-10-03 16:17:44,350: WARNING/ForkPoolWorker-7] so far so good
[2019-10-03 16:17:44,351: WARNING/ForkPoolWorker-7] AFTER POLL
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] so far so good
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] AFTER POLL

Я добавил несколько print в функцию produce_message, которая генерирует сообщения в тему AVRO. Я уверен, что produce_message работает, потому что я использую его в других проектах. Здесь я увидел, что по некоторым причинам метод flush, который ожидает доставки всех сообщений в очереди источника, кажется, не завершает свою работу. Фактически, из журнала консоли я вижу, что print("AFTER FLUSH") никогда не распечатывается, и потребитель никогда не получает никаких сообщений.

На мой взгляд, это похоже на проблему с тем, как работает библиотека python для слитой Kafka, когдаон используется так, как я пытаюсь его использовать.

Фактически, из журнала рабочей консоли кажется, что рабочий ставит в очередь полученные задачи, а затем пытается выдать сообщение Кафке.

[2019-10-03 16:17:44,225: INFO/MainProcess] Received task: scheduler.tasks.check[a038a27d-b8de-4401-9b2e-ef18216837ac]  
[2019-10-03 16:17:44,227: INFO/MainProcess] Received task: scheduler.tasks.check[804a1790-cc96-4d73-bff8-a94a05896135]  
[2019-10-03 16:17:44,229: INFO/MainProcess] Received task: scheduler.tasks.check[0c863189-c855-4b47-9997-caf88d2e0873]  
[2019-10-03 16:17:44,230: INFO/MainProcess] Received task: scheduler.tasks.check[040b612c-2a05-4f34-9683-21ad120935df]  
[2019-10-03 16:17:44,232: INFO/MainProcess] Received task: scheduler.tasks.check[c0431e9c-f5b5-4ce2-8fd9-9e785b77b197]  
[2019-10-03 16:17:44,234: INFO/MainProcess] Received task: scheduler.tasks.check[1fde4114-2c2d-4729-9b99-9990e1889cc9]  
[2019-10-03 16:17:44,235: INFO/MainProcess] Received task: scheduler.tasks.check[e8782905-a721-4123-a1fd-0ce3cd14ff5e]  
[2019-10-03 16:17:44,237: INFO/MainProcess] Received task: scheduler.tasks.check[7f0aa553-87ab-446f-b07a-06eea43c82b5]  
[2019-10-03 16:17:44,238: INFO/MainProcess] Received task: scheduler.tasks.check[af770811-3105-4625-84e0-cd87a5d1806b]  
[2019-10-03 16:17:44,240: INFO/MainProcess] Received task: scheduler.tasks.check[d6dbac83-1cf1-4848-aa3c-6ebd035c56e7]  
[2019-10-03 16:17:44,242: INFO/MainProcess] Received task: scheduler.tasks.check[3e7734d5-fd62-463a-80ba-21de2451ef30]  
[2019-10-03 16:17:44,243: INFO/MainProcess] Received task: scheduler.tasks.check[a6c26e93-d76b-4cc3-b63f-a2c0b8366742]  
[2019-10-03 16:17:44,245: INFO/MainProcess] Received task: scheduler.tasks.check[6b0fca00-9e22-43ea-a24c-4d542b437343]  
[2019-10-03 16:17:44,247: INFO/MainProcess] Received task: scheduler.tasks.check[eadb0342-5e33-4861-b120-2c1c6eb118ea]  
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] so far so good
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] AFTER POLL
[2019-10-03 16:17:44,329: WARNING/ForkPoolWorker-6] so far so good
[2019-10-03 16:17:44,330: WARNING/ForkPoolWorker-6] AFTER POLL
[2019-10-03 16:17:44,343: WARNING/ForkPoolWorker-1] so far so good
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-1] AFTER POLL
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-8] so far so good
[2019-10-03 16:17:44,345: WARNING/ForkPoolWorker-8] AFTER POLL
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-3] so far so good
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-4] so far so good
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-3] AFTER POLL
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-4] AFTER POLL
[2019-10-03 16:17:44,350: WARNING/ForkPoolWorker-7] so far so good
[2019-10-03 16:17:44,351: WARNING/ForkPoolWorker-7] AFTER POLL
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] so far so good
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] AFTER POLL

Я знаю, что библиотека python для конфлюентной Kafka поддерживает только синхронные операции, и мне было интересно, может ли это быть проблемой блокировки. Поскольку я заинтересован в том, чтобы научиться правильно использовать CeleryBeat для отправки команд, мне было интересно, может ли кто-нибудь указать мне правильное направление.

Заранее спасибо

...