Ограничить число python сопрограмм, чтобы предотвратить нагрузку на событие l oop с помощью aiokafka - PullRequest
0 голосов
/ 20 января 2020

У меня есть работник, который читает сообщения от Кафки, используя aiokafka. Проблема заключается в том, что в очереди находятся тысячи сообщений, и рабочее событие l oop достигает задач 2 КБ, поэтому первая задача возвращается с задержкой (потому что событие l oop занимает много времени для проверки состояния каждой задачи). способ решить это? используя aiokafka или библиотеку asyncio?

вот часть моего кода.

Спасибо за помощь! :)

import uuid
import time
import asyncio
import logging
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from kafka_client import config


logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()


class KafkaClient(object):
    def __init__(self, *topics, group_id=None):
        """
        Class constructor
        :param group_id: group ID for shared subscribers
        :param topics: list of topic to subscribe
        """
        self.topics = topics
        connection_config = dict(loop=loop, bootstrap_servers=config.KAFKA_SERVER)
        if group_id is not None:
            connection_config['group_id'] = group_id
        self.producer = AIOKafkaProducer(
            loop=loop, bootstrap_servers=config.KAFKA_SERVER,
            key_serializer=str.encode,
            value_serializer=lambda m: json.dumps(m).encode('utf-8'))
        if self.topics:
            self.consumer = AIOKafkaConsumer(*self.topics, **connection_config)

    async def start_producer(self, app):
        await self.producer.start()

    async def consume(self, callback=None):
        tasks = []
        if callback:
            await callback()

        await self.producer.start()
        await self.consumer.start()
        try:
            async for msg in self.consumer:
                try:
                    message = json.loads(msg.value)
                    tasks.append(loop.create_task(self.handle_message(message, msg.key, msg.topic)))
                except Exception as err:
                    logger.exception('Failed to process message [msg]: %s [err]: %s' % (msg.value, err))

        finally:
            # Will leave consumer group; perform autocommit if enabled.
            await self.consumer.stop()
        await asyncio.gather(*tasks)

    def start_consume(self, callback=None):
        logger.info('Start consume [topics]: %s' % self.topics)
        loop.run_until_complete(self.consume(callback))
...