Потребительский баланс Kafka происходит, даже если нет сообщений для потребления - PullRequest
0 голосов
/ 27 февраля 2019

Я наблюдал очень много перефасовок потребителей кафки, даже если поток ничего не потребляет. Я ожидал бы, что потребитель не перебалансирует в этом сценарии.Вот пример кода.

import argparse
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka import KafkaConsumer
import time
import sys
import logging
from kafka.consumer.subscription_state import ConsumerRebalanceListener

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)


def get_config(args):
    config = {
        'bootstrap_servers': args.host,
        'group_id': args.group,
        'key_deserializer': lambda msg: msg,
        'value_deserializer': lambda msg: msg,
        'partition_assignment_strategy': [RangePartitionAssignor],
        'max_poll_records': args.records,
        'auto_offset_reset': args.offset,
        # 'max_poll_interval_ms': 300000,
        # 'connections_max_idle_ms': 8 * 60 * 1000
    }
    return config


def start_consumer(args):
    config = get_config(args)
    consumer = KafkaConsumer(**config)
    consumer.subscribe([args.topic],
                       listener=RepartitionListener(None))
    for record in consumer:
        print record.offset, record.partition
        time.sleep(int(args.delay) / 1000.0)



class RepartitionListener(ConsumerRebalanceListener):

    def __init__(self):
        pass

    def on_partitions_revoked(self, revoked):
        print("partition revoked ")
        for tp in revoked:
            try:
                print("[{}] revoked topic = {} partition = {}".format(time.strftime("%c"),
                                                                      tp.topic, tp.partition))
                partition_key = "{}_{}".format(tp.topic, str(tp.partition))

            except Exception as e:
                print("Got exception partition_key = {} {}".
                      format(tp, e.message))

    def on_partitions_assigned(self, assigned):
        pass


def main():
    parser = argparse.ArgumentParser(
        description='Tool to test consumer group with delay')
    named_args = parser.add_argument_group('snamed arguments')
    named_args.add_argument('-g', '--group', help='group id for the consumer',
                            required=True)
    named_args.add_argument('-r', '--records', help='num records to consume',
                            required=True)
    named_args.add_argument('-k', '--topic', help='kafka topic', required=True)
    named_args.add_argument('-d', '--delay', help='add process delay in ms', required=True)
    named_args.add_argument('-s', '--host', help='Kafka host format host:port', required=False)
    parser.add_argument('-o', '--offset',
                        default='latest',
                        help='offset to read from earliest/latest')

    args = parser.parse_args()
    print args
    start_consumer(args)


if __name__ == "__main__":
    main()
  1. Как избежать триггера перебалансировки?Из журналов я вижу, что сердцебиение не работает, но я ожидаю, что сердцебиение будет продолжаться, даже если нет сообщений в течение периода времени, превышающего session.time.out.ms.

2019-02-27 20: 39: 43,281 - kafka.coordinator - ПРЕДУПРЕЖДЕНИЕ - истек сеанс пульса, отмечен координатор мертвым

...