У меня есть процесс загрузки нескольких тем Кафки в базу данных.
Работает и загружает данные, но периодически пользователь зависает на 7-12 часов, а затем возвращает сообщения.
Это происходит только для 1 или 2 тем из 11 в моем списке.
Поскольку темы имеют разные характеристики (некоторые json, некоторые являются авро, а некоторые зашифрованы ) Я использую потребителя для каждой топи c.
В сокращенном коде ниже;
- Я читаю темы из themes_list json file
- Создайте объект слушателей для каждого потребителя / topi c
- Прослушайте разделы и обработайте данные для загрузки
from kafka import KafkaConsumer
class KafkaListener:
def init(self,topic):
self.topic_name = str(topic.get("topic_name")). # coming from topics_list json file
self.offset = topic.get("offset", "earliest") # default earliest, would like to be able to control per topic
self.timeout = topic.get("timeout", 24000) # default to 24,000 would like to be able to control per topic
self.consumer = KafkaConsumer(
group_id="group1",
bootstrap_servers=bootstrap_server_address,
auto_offset_reset=self.offset,
value_deserializer=deserializer,
consumer_timeout_ms=self.timeout,
)
self.consumer.subscribe([self.topic_name])
def load(self):
log(f"Started listening topic: {self.topic_name}")
for message in self.consumer:
< process the message >
log(f"Load completed for topic: {self.topic_name}")
# read topics from file
with topics_list_file.open("r") as f_topics:
topics_read = json.load(f_topics)
# build listeners for each topic
for topic in topics:
listeners[topic] = KafkaListener(topics[topic])
# listen the topics, load data
while True:
for listener in listeners:
listener.load()
файл themes_list
[
{
"topic_name": "json_topic1",
"message_type": "json"
},
{
"topic_name": "avro_topic1",
"message_type": "avro",
"avro_schema": "avro_topic1_schema"
},
{
"topic_name": "avro.topic2",
"message_type": "avro",
"avro_schema": "avro_topic2_schema"
},
{
"topic_name": "json.topic2",
"message_type": "json",
"encrypted": "True"
}
]
отрывок из последний файл журнала
...
2020-04-23 13:03:46 Started listening topic: json_topic1
2020-04-23 20:10:09 Load completed for topic: json_topic1
...
В этом примере слушатель ждал около 7 часов, прежде чем вернуть данные.
Как я могу предотвратить эту задержку?
Любой другой подход, который я должен использовать для этого варианта использования?
Обновление: добавлен код для установки topic names
и значения по умолчанию для offset
и timeout