После увеличения времени для session_timeout и request_timeout мой потребитель kafka не получает никаких сообщений, но он начинает получать сообщения обратно, если я перезагружаю свою систему. Затем, если я остановлюсь и перезапущу моего потребителя, тогда agin не получит никакого сообщения.
Вот так выглядит мой потребитель:
def __init__(self, group_id='default', topic='default',
bootstrap_servers=['localhost:9092']):
self.topic = topic
self.bootstrap_servers = bootstrap_servers
self.group_id = group_id
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=self.group_id,
auto_commit_interval_ms=500,
request_timeout_ms=3000000,
session_timeout_ms=2500000,
metrics_sample_window_ms=300000,
value_deserializer=lambda x: loads(x.decode('utf-8')))