У меня есть работник, который читает сообщения от Кафки, используя aiokafka. Проблема заключается в том, что в очереди находятся тысячи сообщений, и рабочее событие l oop достигает задач 2 КБ, поэтому первая задача возвращается с задержкой (потому что событие l oop занимает много времени для проверки состояния каждой задачи). способ решить это? используя aiokafka или библиотеку asyncio?
вот часть моего кода.
Спасибо за помощь! :)
import uuid
import time
import asyncio
import logging
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from kafka_client import config
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
class KafkaClient(object):
def __init__(self, *topics, group_id=None):
"""
Class constructor
:param group_id: group ID for shared subscribers
:param topics: list of topic to subscribe
"""
self.topics = topics
connection_config = dict(loop=loop, bootstrap_servers=config.KAFKA_SERVER)
if group_id is not None:
connection_config['group_id'] = group_id
self.producer = AIOKafkaProducer(
loop=loop, bootstrap_servers=config.KAFKA_SERVER,
key_serializer=str.encode,
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
if self.topics:
self.consumer = AIOKafkaConsumer(*self.topics, **connection_config)
async def start_producer(self, app):
await self.producer.start()
async def consume(self, callback=None):
tasks = []
if callback:
await callback()
await self.producer.start()
await self.consumer.start()
try:
async for msg in self.consumer:
try:
message = json.loads(msg.value)
tasks.append(loop.create_task(self.handle_message(message, msg.key, msg.topic)))
except Exception as err:
logger.exception('Failed to process message [msg]: %s [err]: %s' % (msg.value, err))
finally:
# Will leave consumer group; perform autocommit if enabled.
await self.consumer.stop()
await asyncio.gather(*tasks)
def start_consume(self, callback=None):
logger.info('Start consume [topics]: %s' % self.topics)
loop.run_until_complete(self.consume(callback))