Kafka Consumer не получает сообщения после перезапуска - PullRequest
0 голосов
/ 14 октября 2019

После увеличения времени для session_timeout и request_timeout мой потребитель kafka не получает никаких сообщений, но он начинает получать сообщения обратно, если я перезагружаю свою систему. Затем, если я остановлюсь и перезапущу моего потребителя, тогда agin не получит никакого сообщения.

Вот так выглядит мой потребитель:

def __init__(self, group_id='default', topic='default', 
            bootstrap_servers=['localhost:9092']):
              self.topic = topic
              self.bootstrap_servers = bootstrap_servers
              self.group_id = group_id
              self.consumer = KafkaConsumer(
                                           self.topic,
        bootstrap_servers=self.bootstrap_servers,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id=self.group_id,
        auto_commit_interval_ms=500,
        request_timeout_ms=3000000,
        session_timeout_ms=2500000,
        metrics_sample_window_ms=300000,
        value_deserializer=lambda x: loads(x.decode('utf-8')))
...