Не может потреблять из Kafka Topi c с использованием confluent-kafka- python для новой группы потребителей - PullRequest
0 голосов
/ 20 января 2020

Я пытаюсь использовать Kafka Topi c, используя confluent-kafka- python, для совершенно новой группы потребителей (GROUP_NAME_CONNECT5 в приведенном ниже примере). Кажется, это не работает, если я сначала не использую kafka-console-consumer, используя эту новую группу потребителей! После того, как я использую kafka-console-consumer только один раз, потребитель confluent-kafka- python работает нормально !! Любая идея, почему?

Свойства:

[Kafka]

bootstrap_servers =

ssl_ca_location =

max_wait_cycles = 30

имя_группы = GROUP_NAME_CONNECT5

client_id = CLIENT_ID

auto_commit_interval_ms = 5000

Код:

kafkaConsumerConfig = {
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.GROUP_NAME,
    'client.id': config.CLIENT_ID,
    'session.timeout.ms': 60000,
    'heartbeat.interval.ms': 3000,

    'security.protocol': 'SASL_SSL',
    'sasl.kerberos.service.name': 'kafka',

    'sasl.mechanisms': 'GSSAPI',
    'ssl.ca.location': config.SSL_CA_LOCATION,
    'sasl.kerberos.kinit.cmd': 'kinit -S {0} {1} -k -t {2}'.format(config.KEYTAB_PRINCIPAL, config.KEYTAB_USER, config.KEYTAB_PATH),
    'default.topic.config': {
        'enable.auto.commit': 'false',
        'enable.auto.offset.store': 'false'
    }

c = Consumer(**self.kafkaConsumerConfig)

c.subscribe([self.TOPIC_NAME])

while True:
    kafka_msg = c.poll(1.0)
    process_message(kafka_msg)

Журналы: после запуска Python Потребитель для 5 минут (а потом убили его). Потребитель Python не смог принять ни одного сообщения!

% 7 | 1579623452.300 | INIT | CLIENT_ID # consumer-1 | [thrd: app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID # потребитель-1 инициализирован (builtin.features gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, плагины, Gxuth 10, sasl_asl PKGCONFIG УСТАНОВИТЬ GNULD LDS LIBDL ПЛАГИНЫ ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, отладка 0x2000)% 7 | 1579623452.303 | SUBSC [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписаться на новую подписку на 1 тему (присоединиться к состоянию init)% 7 | 1579623452.303 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» перебалансируется в состоянии init (init-состоянии соединения) без назначения: отписаться% 7 | 1579623453.441 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": откладывает присоединение до тех пор, пока не будут доступны обновленные метаданные% 7 | 1579623453.443 | REJOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписка обновлена ​​после изменения метаданных: присоединение к группе% 7 | 1579623453.443 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» выполняет перебалансировку в состоянии up (инициализация состояния соединения) без назначения: группа воссоединяется% 7 | 1579623455.300 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: sasl_ssl: // xxxx / 159: Присоединение к группе «GROUP_NAME_CONNECT5» с 1 подписанным topi c (s)% 7 | 1579623458.305 | ASSIGNOR | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: запускается присваиватель «range» для 1 участника (ов)% 7 | 1579623458.309 | ASSIGN | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: новое назначение 8 разделов в состоянии соединения wait-assign-rebalance_cb% 7 | 1579623458.309 | OFFSET | CLIENT_ID # consumer-1 | [thrd: main]: GroupCoordinator / 159: выборка принятых смещений для 8/8 раздела (ов)% 7 | 1579623458.312 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [2] начинает выборку со смещением 123698% 7 | 1579623458.313 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [7] начинает выборку со смещением 116555% 7 | 1579623458.465 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [4] начинает выборку со смещением 106800% 7 | 1579623458.484 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [1] начинает выборку со смещением 107557% 7 | 1579623458.485 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [6] начинает выборку со смещением 109805% 7 | 1579623458.486 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [0] начинает выборку со смещением 91465% 7 | 1579623458.487 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [3] начинает выборку со смещением 102042% 7 | 1579623458.487 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [5] начинает выборку со смещением 117214

запускает группы-кафки-потребителей

.. / bin / kafka-consumer-groups. sh - bootstrap -server xxx --command-config generator.properties --group GROUP_NAME_CONNECT5 --describe

Группа потребителей 'GROUP_NAME_CONNECT5' не имеет активных членов.

затем я запускаю kafka- консольный потребитель использует ту же новую группу потребителей (GROUP_NAME_CONNECT5). Он потребляет все сообщения в Topi c.

.. / bin / kafka-consumer-groups. sh - bootstrap -server xxx --command-config generator.properties - group GROUP_NAME_CONNECT5 - опишите предупреждение виртуальной машины 64-битного сервера OpenJDK: если ожидается, что число процессоров увеличится с одного, то вам следует соответствующим образом настроить число параллельных потоков G C, используя -XX: ParallelGCThreads = N группа потребителей 'GROUP_NAME_CONNECT5 'не имеет активных членов.

TOPI C Идентификатор потребителя LAG-END-OFFSET LAG-ENS-OFFSET ОТДЕЛЕНИЕ ТЕКУЩЕГО РАЗМЕРА ХОЗЯЙСТВЕННЫЙ КЛИЕНТ-ID che_silo_cnsld_rpt_mthly 1 112644 112644 0 - - che_silo_cnsld_rilo_t_0_30_30 --_99_30_m_99 0 95715 95715 0 - - - che_silo_cnsld_rpt_mthly 3 106932 106932 0 - - - che_silo_cnsld_rpt_mthly 4 112588 112588 0 - - - che_silo_cnsld_rpt_mthly 7 122047 122047 0 - - - che_silo_cnsld_rpt_mthly 2 129940 129940 0 - - - che_silo_cnsld_rpt_mthly 6 115050 115050 0 - - - * одна тысяча тридцать шесть *

Затем я создаю несколько новых сообщений в топи c и запустите python -потребителя ... и на этот раз python -потребителя успешно запустится!

% 7 | 1579624630.644 | INIT | CLIENT_ID # consumer-1 | [thrd: app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID # потребитель-1 инициализирован (buildin.features gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, плагины, Gxuth 10, sasl_asl PKGCONFIG УСТАНОВИТЕ GNULD LDS LIBDL ПЛАГИНЫ ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, отладка 0x2000)% 7 | 1579624630.648 | SUBRIC | SUBSC [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписаться на новую подписку на 1 тему (присоединиться к состоянию init)% 7 | 1579624630.648 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» выполняет перебалансировку в состоянии init (init-состоянии соединения) без назначения: отписаться% 7 | 1579624631.807 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": откладывает присоединение до тех пор, пока не появятся обновленные метаданные% 7 | 1579624631.808 | REJOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписка обновлена ​​после изменения метаданных: присоединяется к группе% 7 | 1579624631.808 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» выполняет перебалансировку в состоянии up (инициализация состояния соединения) без назначения: группа воссоединяется% 7 | 1579624633.644 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: sasl_ssl: //cilhdkfs0304.sys.cigna.com: 9095/159: Присоединение к группе «GROUP_NAME_CONNECT5» с 1 подпиской topi c (s)% 7 | 1579624636.650 | ASSIGNOR | CLIENT_ID # customer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: запускается присваиватель «range» для 1 участника (ов)% 7 | 1579624636.654 | ASSIGN | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: новое назначение 8 разделов в состоянии соединения wait-assign-rebalance_cb% 7 | 1579624636.654 | OFFSET | CLIENT_ID # consumer-1 | [thrd: main]: GroupCoordinator / 159: выборка принятых смещений для 8/8 раздела (ов)% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [0] начинает выборку со смещением 91465% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [1] начинает выборку со смещением 107557% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [2] начинает выборку со смещением 123698% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [3] начинает выборку со смещением 102042% 7 | 1579624636. 656 | FETCH | CLIENT_ID # потребитель-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [4] начинает выборку со смещением 106800% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [5] начинает выборку со смещением 117214% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [6] начинает выборку со смещением 109805% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [7] начинает выборку со смещением 116555

1 Ответ

0 голосов
/ 22 января 2020

Полагаю, ваша проблема связана с конфигурацией auto.offset.reset. Когда ваш потребитель присоединяется к кластеру, эта конфигурация определяет, с какого смещения ваш потребитель начнет потреблять. Попробуйте ввести:

"auto.offset.reset": "earliest"

Эта конфигурация заставит вашего потребителя начать потребление с первого (самого раннего) смещения / сообщения. Конфигурация по умолчанию «самая большая», это означает, что потребитель начнет потреблять, когда в topi c появятся новые сообщения. Проверьте это для более подробной информации.

...