Как обработать сообщение Кафки в течение длительного времени (4-60 минут), без автоматической фиксации, и зафиксировать его без перебалансировки - PullRequest
0 голосов
/ 14 апреля 2020

Как использовать сообщение Kafka без автоматической фиксации, обрабатывать его в течение длительного времени (4-60 минут) и фиксировать, не подвергаясь перебалансировке, а также переназначать разделы или блокировать использование других сообщений другими пользователями группы.

Я использую потребителя Python 3.8 Kafka, чтобы:

  • Использовать по одному сообщению за раз, без автоматической фиксации.
  • Запуск длительной работы скрипт (чтение его стандартного вывода в Python)
  • Условно зафиксируйте сообщение.

Моя проблема в том, что часто разделы Kafka перебалансируются с другим членом группы потребителей.

После заливки документации я попытался поиграть со следующими свойствами конфигурации:

  • session_timeout_ms
  • request_timeout_ms
  • max_poll_interval_ms

    from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition
    
    def consume_one_message_at_a_time(conf):
    
    conf.models_dir = f'{conf.project_root}/{conf.models_dir}'
    group_id = conf.group_id
    group_conf = conf.consumer_groups[group_id]
    
    kafka_brokers = conf.KAFKA_BROKERS
    topic = group_conf.subscribe[0].name
    
    print(f'KAFKA_BROKERS: {kafka_brokers}\n Topic {topic}\n group id: {group_id}')
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=kafka_brokers,
        group_id=group_id,
        enable_auto_commit=False,
        max_poll_records=1,
        max_poll_interval_ms=1800000,
        # session_timeout_ms=1800000,
        # request_timeout_ms=1800002,
        # connections_max_idle_ms=1800003
        # heartbeat_interval_ms=1800000,
    )
    
    print(f'bootstrap_servers: {kafka_brokers} subscribing to {topic}')
    consumer.subscribe([topic])
    
    for message in consumer:
        print(f"message is of type: {type(message)}")
    
        if not group_conf.use_cmd:
            do_something_time_consuming(message)
        else:
            if group_id == 'bots' and check_bot_id(message):
    
                bot_action(conf, group_conf, message)
            else:
                print(f'no action for group_id: {group_id}')
                print(f'key  : {message.key}')
                print(f'value: {message.value}')
    
        meta = consumer.partitions_for_topic(message.topic)
    
        partition = TopicPartition(message.topic, message.partition)
        offsets = OffsetAndMetadata(message.offset + 1, meta)
        options = {partition: offsets}
    
        print(f'\noptions: {options}\n')
    
        response = consumer.commit(offsets=options)
    

Когда другие члены группы подписываются или заканчивают sh свои работы и потребляют, я получаю эту ошибку:

    Traceback (most recent call last):
  File "./consumer_one_at_a_time.py", line 148, in <module>
    consume_one_message_at_a_time(_conf)
  File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
    response = consumer.commit(offsets=options)
  File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
    self._coordinator.commit_offsets_sync(offsets)
  File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

После добавления этих конфигураций я обнаружил, что новые потребители заблокированы! Т.е. не потреблять сообщения, пока одно не будет зафиксировано!

session_timeout_ms=1800000,
request_timeout_ms=1800002,
connections_max_idle_ms=1800003
# heartbeat_interval_ms=1800000,

Я прочитал, что фоновый поток должен посылать сердцебиение. Есть ли способ отправить сердцебиение без опроса?

1 Ответ

3 голосов
/ 14 апреля 2020

Есть ли способ отправить сердцебиение без опроса?

Это уже работает так. Heartbeat отправляется через отдельный поток в Kafka начиная с версии 0.10.1.0. (Вы можете проверить это для получения дополнительной информации)

В общем случае происходит перебалансировка в следующих ситуациях:

  • новый потребитель присоединяется к группе потребителей
  • Добавление новые разделы
  • Чистое отключение потребителя
  • Кафка считает потребителя мертвым
    • Истекает session.timeout.ms без отправки пульса
    • Срок действия max.poll.timeout.ms истекает без отправки запроса на опрос

Похоже, ваша ситуация является последней. Вы опрашиваете записи, но не опрашиваете снова через max.poll.interval.ms (в вашем случае 30 минут) из-за длительного процесса. Чтобы решить эту проблему:

  • Вы можете увеличить max.poll.interval.ms. Но это может привести к слишком долгим перебалансировкам. Потому что rebalance.timeout = max.poll.interval.ms. После запуска восстановления баланса все потребители в группе потребителей отменяются, и Kafka ждет всех потребителей, которые все еще отправляют тактовое сообщение в poll () (путем опроса потребителей отправляют joinGroupRequest в этот момент), пока не истечет время ожидания перебалансировки, равное max.poll.interval.ms. Допустим, вы установили max.poll.interval.ms на 60 минут, и ваш процесс занимает 50 минут до окончания sh. Если перебалансировка начнется из-за одной из причин, которые я упомянул выше на пятой минуте вашего длинного процесса, то Kafka будет ждать вашего покупателя в течение 45 минут. В течение этого периода времени все потребители будут отозваны. (потребление будет полностью остановлено для этой группы потребителей) Так что это не очень хорошая идея, ИМХО. (конечно, это зависит от ваших потребностей)
  • Другое решение - не использовать Kafka для такого рода длительных операций. Потому что Кафка не подходит для длительной обработки. Вы можете сохранить метаданные о длинных процессах как часть потребления сообщений с помощью Kafka, а затем выполнить соответствующую операцию без использования Kafka.
...