Мне бы хотелось иметь планировщик, который позволял бы мне создавать сообщения на темы AVRO через версию Confluent Kafka, и я решил попробовать CeleryBeat.
Я использую 'v1.2' из'https://github.com/confluentinc/confluent-kafka-python' и' Redis 'в качестве брокера для Celery.
Я не уверен, выполнимо ли то, чего я пытаюсь достичь, или нет. Идея состоит в том, чтобы использовать CeleryBeat, чтобы работники периодически создавали сообщения на заданную тему Kafka AVRO. Однако по какой-то причине кажется, что производитель не может отправить сообщения.
У меня есть проект со следующей структурой:
beat_scheduler
-scheduler
-__init__.py
-celery.py
-celeryconfig.py
-tasks.py
#celeryconfig.py
broker_url = 'redis://localhost:6379/0'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Dublin'
enable_utc = True
#celery.py
from celery import Celery
app = Celery('scheduler',
include=['scheduler.tasks'])
app.config_from_object('scheduler.celeryconfig')
app.conf.beat_schedule = {
"see-you-in-ten-seconds-task": {
"task": "scheduler.tasks.check",
"schedule": 10.0,
"kwargs":{"test_name":"wfbss"}
}
}
#tasks.py
from scheduler.celery import app
from confluent_kafka.avro import AvroProducer
from avro import schema
import sys, requests,os
from datetime import datetime, timezone
default_key_value = {"service_name": os.environ['service_name']}
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d]\n' % \
(msg.topic(), msg.partition()))
def find_latest_schema(topic,SCHEMA_REGISTRY_URL):
subject = topic
versions_response = requests.get(
url="{}/subjects/{}/versions".format(SCHEMA_REGISTRY_URL, subject),
headers={
"Content-Type": "application/vnd.schemaregistry.v1+json",
},
)
latest_version = versions_response.json()[-1]
schema_response = requests.get(
url="{}/subjects/{}/versions/{}".format(SCHEMA_REGISTRY_URL, subject, latest_version),
headers={
"Content-Type": "application/vnd.schemaregistry.v1+json",
},
)
schema_response_json = schema_response.json()
return schema_response_json["id"], schema.Parse(schema_response_json["schema"])
class Producer:
"""
Avro Producer
It needs the following kargs
- bootstrap_servers
- schema_reqistry_url
- topic
"""
def __init__(self, *args, **kwargs):
self.__servers = kwargs.pop('bootstrap_servers', None)
self.__registry = kwargs.pop('schema_reqistry_url', None)
self.topic = kwargs.pop('topic', None)
self.security_protocol = kwargs.pop('security_protocol', 'plaintext')
self.ssl_ca_location = kwargs.pop('ssl_ca_location','./configuration/cacert.pem')
self.debug_level = kwargs.pop('debug_level', 'security')
if not self.topic:
sys.stderr.write('%% Topic name not specified: \n')
raise ValueError("Topic name not specified")
schema_id, default_value_schema = find_latest_schema(self.topic+"-value",self.__registry)
schema_id, default_key_schema = find_latest_schema(self.topic+"-key",self.__registry)
self.default_value_schema = default_value_schema
self.producer = AvroProducer({'bootstrap.servers': self.__servers,
'schema.registry.url': self.__registry,
'log.connection.close':'false',
'security.protocol': self.security_protocol,
'ssl.ca.location': self.ssl_ca_location
},
default_key_schema=default_key_schema, default_value_schema=default_value_schema)
def produce_message(self, **kwargs):
"""
Sends message to kafka by encoding with specified avro schema
@:param: topic: topic name
@:param: value: An object to serialize
@:param: key: An object to serialize
"""
# get schemas from kwargs if defined
callback_function = kwargs.pop('delivery_callback', delivery_callback)
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)
try:
self.producer.produce(topic=self.topic, value=value, key=key, callback=callback_function)
print("so far so good ")
except Exception as e:
# I don't have any errors
print("ERROR:",e)
return None
self.producer.poll(0)
print("AFTER POLL")
self.producer.flush()
print("AFTER FLUSH ")
return {"topic": self.topic, "value": value, "sent": True}
my_producer = Producer(
bootstrap_servers=os.environ.get('brokers'),
schema_reqistry_url=os.environ.get('schema_registry'),
topic='my-test-topic',
security_protocol=os.environ.get('security_protocol', 'plaintext'))
def request_saas_test_to_start(test_name):
body = {
"cloud": "vcloud",
"timestamp": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"scheduled_test_to_run": test_name
}
print("Sending :", body)
my_producer.produce_message(value=body, key=default_key_value)
@app.task
def check(**kwargs):
print("I am checking you stuff:",kwargs)
request_saas_test_to_start(test_name='wfbss')
Поэтому я сначала запускаю ритм сельдерея celery -A scheduler beat --loglevel=info
который, кажется, выполняет работу каждые 10 секунд
[2019-10-03 16:10:08,513: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
[2019-10-03 16:10:18,504: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
[2019-10-03 16:10:28,504: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
[2019-10-03 16:10:38,504: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (scheduler.tasks.check)
, а затем я запускаю работника
celery -A scheduler worker --loglevel=info
, который, кажется, не в состоянии выдать сообщение длязаданная тема
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] so far so good
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] AFTER POLL
[2019-10-03 16:17:44,329: WARNING/ForkPoolWorker-6] so far so good
[2019-10-03 16:17:44,330: WARNING/ForkPoolWorker-6] AFTER POLL
[2019-10-03 16:17:44,343: WARNING/ForkPoolWorker-1] so far so good
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-1] AFTER POLL
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-8] so far so good
[2019-10-03 16:17:44,345: WARNING/ForkPoolWorker-8] AFTER POLL
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-3] so far so good
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-4] so far so good
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-3] AFTER POLL
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-4] AFTER POLL
[2019-10-03 16:17:44,350: WARNING/ForkPoolWorker-7] so far so good
[2019-10-03 16:17:44,351: WARNING/ForkPoolWorker-7] AFTER POLL
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] so far so good
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] AFTER POLL
Я добавил несколько print
в функцию produce_message
, которая генерирует сообщения в тему AVRO. Я уверен, что produce_message
работает, потому что я использую его в других проектах. Здесь я увидел, что по некоторым причинам метод flush
, который ожидает доставки всех сообщений в очереди источника, кажется, не завершает свою работу. Фактически, из журнала консоли я вижу, что print("AFTER FLUSH")
никогда не распечатывается, и потребитель никогда не получает никаких сообщений.
На мой взгляд, это похоже на проблему с тем, как работает библиотека python для слитой Kafka, когдаон используется так, как я пытаюсь его использовать.
Фактически, из журнала рабочей консоли кажется, что рабочий ставит в очередь полученные задачи, а затем пытается выдать сообщение Кафке.
[2019-10-03 16:17:44,225: INFO/MainProcess] Received task: scheduler.tasks.check[a038a27d-b8de-4401-9b2e-ef18216837ac]
[2019-10-03 16:17:44,227: INFO/MainProcess] Received task: scheduler.tasks.check[804a1790-cc96-4d73-bff8-a94a05896135]
[2019-10-03 16:17:44,229: INFO/MainProcess] Received task: scheduler.tasks.check[0c863189-c855-4b47-9997-caf88d2e0873]
[2019-10-03 16:17:44,230: INFO/MainProcess] Received task: scheduler.tasks.check[040b612c-2a05-4f34-9683-21ad120935df]
[2019-10-03 16:17:44,232: INFO/MainProcess] Received task: scheduler.tasks.check[c0431e9c-f5b5-4ce2-8fd9-9e785b77b197]
[2019-10-03 16:17:44,234: INFO/MainProcess] Received task: scheduler.tasks.check[1fde4114-2c2d-4729-9b99-9990e1889cc9]
[2019-10-03 16:17:44,235: INFO/MainProcess] Received task: scheduler.tasks.check[e8782905-a721-4123-a1fd-0ce3cd14ff5e]
[2019-10-03 16:17:44,237: INFO/MainProcess] Received task: scheduler.tasks.check[7f0aa553-87ab-446f-b07a-06eea43c82b5]
[2019-10-03 16:17:44,238: INFO/MainProcess] Received task: scheduler.tasks.check[af770811-3105-4625-84e0-cd87a5d1806b]
[2019-10-03 16:17:44,240: INFO/MainProcess] Received task: scheduler.tasks.check[d6dbac83-1cf1-4848-aa3c-6ebd035c56e7]
[2019-10-03 16:17:44,242: INFO/MainProcess] Received task: scheduler.tasks.check[3e7734d5-fd62-463a-80ba-21de2451ef30]
[2019-10-03 16:17:44,243: INFO/MainProcess] Received task: scheduler.tasks.check[a6c26e93-d76b-4cc3-b63f-a2c0b8366742]
[2019-10-03 16:17:44,245: INFO/MainProcess] Received task: scheduler.tasks.check[6b0fca00-9e22-43ea-a24c-4d542b437343]
[2019-10-03 16:17:44,247: INFO/MainProcess] Received task: scheduler.tasks.check[eadb0342-5e33-4861-b120-2c1c6eb118ea]
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] so far so good
[2019-10-03 16:17:44,328: WARNING/ForkPoolWorker-5] AFTER POLL
[2019-10-03 16:17:44,329: WARNING/ForkPoolWorker-6] so far so good
[2019-10-03 16:17:44,330: WARNING/ForkPoolWorker-6] AFTER POLL
[2019-10-03 16:17:44,343: WARNING/ForkPoolWorker-1] so far so good
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-1] AFTER POLL
[2019-10-03 16:17:44,344: WARNING/ForkPoolWorker-8] so far so good
[2019-10-03 16:17:44,345: WARNING/ForkPoolWorker-8] AFTER POLL
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-3] so far so good
[2019-10-03 16:17:44,348: WARNING/ForkPoolWorker-4] so far so good
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-3] AFTER POLL
[2019-10-03 16:17:44,349: WARNING/ForkPoolWorker-4] AFTER POLL
[2019-10-03 16:17:44,350: WARNING/ForkPoolWorker-7] so far so good
[2019-10-03 16:17:44,351: WARNING/ForkPoolWorker-7] AFTER POLL
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] so far so good
[2019-10-03 16:17:44,352: WARNING/ForkPoolWorker-2] AFTER POLL
Я знаю, что библиотека python для конфлюентной Kafka поддерживает только синхронные операции, и мне было интересно, может ли это быть проблемой блокировки. Поскольку я заинтересован в том, чтобы научиться правильно использовать CeleryBeat для отправки команд, мне было интересно, может ли кто-нибудь указать мне правильное направление.
Заранее спасибо