Кафка python изящное отключение потребителя - PullRequest
2 голосов
/ 28 апреля 2020

Я пытаюсь корректно завершить работу потребителя кафки, но сценарий блокируется с остановкой потока HeartBeat. Как можно изящно закрыть потребителя на SIGTERM с кафкой- python. Это то, что я сделал

import logger as logging
import time
import sys
from kafka import KafkaConsumer
import numpy as np
import signal


log = logging.getLogger(__name__)


class Cons:
    def __init__(self):

        signal.signal(signal.SIGINT, self.sigterm_handler)
        signal.signal(signal.SIGTERM, self.sigterm_handler)
        self.consumer = KafkaConsumer('dummy-topic', group_id='poll-test', bootstrap_servers=['b1'])

    def sigterm_handler(self, signum, frame):
        log.info("Sigterm handler")
        self.consumer.close(autocommit=False)
        sys.exit(0)



    def consume(self):
        try:
            while True:
                records = self.consumer.poll(timeout_ms=500, max_records=500)
                for topic_partition, consumer_records in records.items():
                    for record in consumer_records:
                        log.info("Got Record - {}".format(record))
                    #code to manually commit


        except ValueError as e:
            log.exception("exception")


if __name__ == '__main__':
    c=Cons()
    c.consume()

С включенными журналами отладки, это вывод, который я получаю, и код блокируется на этом.

^C2020-04-28 07:18:33,050 - MainThread - __main__ - INFO - Sigterm handler
2020-04-28 07:18:33,050 - MainThread - kafka.consumer.group - DEBUG - Closing the KafkaConsumer.
2020-04-28 07:18:33,051 - MainThread - kafka.coordinator - INFO - Stopping heartbeat thread

В чем причина этого? и как правильно закрыть потребителя на SIGTERM или SIGINT?

...