Я наблюдал очень много перефасовок потребителей кафки, даже если поток ничего не потребляет. Я ожидал бы, что потребитель не перебалансирует в этом сценарии.Вот пример кода.
import argparse
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka import KafkaConsumer
import time
import sys
import logging
from kafka.consumer.subscription_state import ConsumerRebalanceListener
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def get_config(args):
config = {
'bootstrap_servers': args.host,
'group_id': args.group,
'key_deserializer': lambda msg: msg,
'value_deserializer': lambda msg: msg,
'partition_assignment_strategy': [RangePartitionAssignor],
'max_poll_records': args.records,
'auto_offset_reset': args.offset,
# 'max_poll_interval_ms': 300000,
# 'connections_max_idle_ms': 8 * 60 * 1000
}
return config
def start_consumer(args):
config = get_config(args)
consumer = KafkaConsumer(**config)
consumer.subscribe([args.topic],
listener=RepartitionListener(None))
for record in consumer:
print record.offset, record.partition
time.sleep(int(args.delay) / 1000.0)
class RepartitionListener(ConsumerRebalanceListener):
def __init__(self):
pass
def on_partitions_revoked(self, revoked):
print("partition revoked ")
for tp in revoked:
try:
print("[{}] revoked topic = {} partition = {}".format(time.strftime("%c"),
tp.topic, tp.partition))
partition_key = "{}_{}".format(tp.topic, str(tp.partition))
except Exception as e:
print("Got exception partition_key = {} {}".
format(tp, e.message))
def on_partitions_assigned(self, assigned):
pass
def main():
parser = argparse.ArgumentParser(
description='Tool to test consumer group with delay')
named_args = parser.add_argument_group('snamed arguments')
named_args.add_argument('-g', '--group', help='group id for the consumer',
required=True)
named_args.add_argument('-r', '--records', help='num records to consume',
required=True)
named_args.add_argument('-k', '--topic', help='kafka topic', required=True)
named_args.add_argument('-d', '--delay', help='add process delay in ms', required=True)
named_args.add_argument('-s', '--host', help='Kafka host format host:port', required=False)
parser.add_argument('-o', '--offset',
default='latest',
help='offset to read from earliest/latest')
args = parser.parse_args()
print args
start_consumer(args)
if __name__ == "__main__":
main()
- Как избежать триггера перебалансировки?Из журналов я вижу, что сердцебиение не работает, но я ожидаю, что сердцебиение будет продолжаться, даже если нет сообщений в течение периода времени, превышающего
session.time.out.ms
.
2019-02-27 20: 39: 43,281 - kafka.coordinator - ПРЕДУПРЕЖДЕНИЕ - истек сеанс пульса, отмечен координатор мертвым