Kafka Consumer прерывисто зависает - PullRequest
0 голосов
/ 24 апреля 2020

У меня есть процесс загрузки нескольких тем Кафки в базу данных.

Работает и загружает данные, но периодически пользователь зависает на 7-12 часов, а затем возвращает сообщения.

Это происходит только для 1 или 2 тем из 11 в моем списке.

Поскольку темы имеют разные характеристики (некоторые json, некоторые являются авро, а некоторые зашифрованы ) Я использую потребителя для каждой топи c.

В сокращенном коде ниже;

  1. Я читаю темы из themes_list json file
  2. Создайте объект слушателей для каждого потребителя / topi c
  3. Прослушайте разделы и обработайте данные для загрузки
from kafka import KafkaConsumer

class KafkaListener:
    def init(self,topic):

        self.topic_name = str(topic.get("topic_name")). # coming from topics_list json file
        self.offset = topic.get("offset", "earliest") # default earliest, would like to be able to control per topic
        self.timeout = topic.get("timeout", 24000) # default to 24,000 would like to be able to control per topic

        self.consumer = KafkaConsumer(
            group_id="group1",
            bootstrap_servers=bootstrap_server_address,
            auto_offset_reset=self.offset,
            value_deserializer=deserializer,
            consumer_timeout_ms=self.timeout,
        )
        self.consumer.subscribe([self.topic_name])

    def load(self):
        log(f"Started listening topic: {self.topic_name}")

        for message in self.consumer:
            < process the message >

        log(f"Load completed for topic: {self.topic_name}")

# read topics from file
with topics_list_file.open("r") as f_topics:
    topics_read = json.load(f_topics)

# build listeners for each topic
for topic in topics:
    listeners[topic] = KafkaListener(topics[topic])

# listen the topics, load data
while True:
    for listener in listeners:
        listener.load()  

файл themes_list

[
{
    "topic_name": "json_topic1",
    "message_type": "json"
},
{
    "topic_name": "avro_topic1",
    "message_type": "avro",
    "avro_schema": "avro_topic1_schema"
},
{
    "topic_name": "avro.topic2",
    "message_type": "avro",
    "avro_schema": "avro_topic2_schema"
},
{
    "topic_name": "json.topic2",
    "message_type": "json",
    "encrypted": "True"
}
]

отрывок из последний файл журнала

...
2020-04-23 13:03:46 Started listening topic: json_topic1
2020-04-23 20:10:09 Load completed for topic: json_topic1
...   

В этом примере слушатель ждал около 7 часов, прежде чем вернуть данные.

Как я могу предотвратить эту задержку?

Любой другой подход, который я должен использовать для этого варианта использования?

Обновление: добавлен код для установки topic names и значения по умолчанию для offset и timeout

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...