Я пытаюсь использовать Kafka Topi c, используя confluent-kafka- python, для совершенно новой группы потребителей (GROUP_NAME_CONNECT5 в приведенном ниже примере). Кажется, это не работает, если я сначала не использую kafka-console-consumer, используя эту новую группу потребителей! После того, как я использую kafka-console-consumer только один раз, потребитель confluent-kafka- python работает нормально !! Любая идея, почему?
Свойства:
[Kafka]
bootstrap_servers =
ssl_ca_location =
max_wait_cycles = 30
имя_группы = GROUP_NAME_CONNECT5
client_id = CLIENT_ID
auto_commit_interval_ms = 5000
Код:
kafkaConsumerConfig = {
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.GROUP_NAME,
'client.id': config.CLIENT_ID,
'session.timeout.ms': 60000,
'heartbeat.interval.ms': 3000,
'security.protocol': 'SASL_SSL',
'sasl.kerberos.service.name': 'kafka',
'sasl.mechanisms': 'GSSAPI',
'ssl.ca.location': config.SSL_CA_LOCATION,
'sasl.kerberos.kinit.cmd': 'kinit -S {0} {1} -k -t {2}'.format(config.KEYTAB_PRINCIPAL, config.KEYTAB_USER, config.KEYTAB_PATH),
'default.topic.config': {
'enable.auto.commit': 'false',
'enable.auto.offset.store': 'false'
}
c = Consumer(**self.kafkaConsumerConfig)
c.subscribe([self.TOPIC_NAME])
while True:
kafka_msg = c.poll(1.0)
process_message(kafka_msg)
Журналы: после запуска Python Потребитель для 5 минут (а потом убили его). Потребитель Python не смог принять ни одного сообщения!
% 7 | 1579623452.300 | INIT | CLIENT_ID # consumer-1 | [thrd: app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID # потребитель-1 инициализирован (builtin.features gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, плагины, Gxuth 10, sasl_asl PKGCONFIG УСТАНОВИТЬ GNULD LDS LIBDL ПЛАГИНЫ ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, отладка 0x2000)% 7 | 1579623452.303 | SUBSC [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписаться на новую подписку на 1 тему (присоединиться к состоянию init)% 7 | 1579623452.303 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» перебалансируется в состоянии init (init-состоянии соединения) без назначения: отписаться% 7 | 1579623453.441 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": откладывает присоединение до тех пор, пока не будут доступны обновленные метаданные% 7 | 1579623453.443 | REJOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписка обновлена после изменения метаданных: присоединение к группе% 7 | 1579623453.443 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» выполняет перебалансировку в состоянии up (инициализация состояния соединения) без назначения: группа воссоединяется% 7 | 1579623455.300 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: sasl_ssl: // xxxx / 159: Присоединение к группе «GROUP_NAME_CONNECT5» с 1 подписанным topi c (s)% 7 | 1579623458.305 | ASSIGNOR | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: запускается присваиватель «range» для 1 участника (ов)% 7 | 1579623458.309 | ASSIGN | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: новое назначение 8 разделов в состоянии соединения wait-assign-rebalance_cb% 7 | 1579623458.309 | OFFSET | CLIENT_ID # consumer-1 | [thrd: main]: GroupCoordinator / 159: выборка принятых смещений для 8/8 раздела (ов)% 7 | 1579623458.312 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [2] начинает выборку со смещением 123698% 7 | 1579623458.313 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [7] начинает выборку со смещением 116555% 7 | 1579623458.465 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [4] начинает выборку со смещением 106800% 7 | 1579623458.484 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [1] начинает выборку со смещением 107557% 7 | 1579623458.485 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [6] начинает выборку со смещением 109805% 7 | 1579623458.486 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [0] начинает выборку со смещением 91465% 7 | 1579623458.487 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [3] начинает выборку со смещением 102042% 7 | 1579623458.487 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [5] начинает выборку со смещением 117214
запускает группы-кафки-потребителей
.. / bin / kafka-consumer-groups. sh - bootstrap -server xxx --command-config generator.properties --group GROUP_NAME_CONNECT5 --describe
Группа потребителей 'GROUP_NAME_CONNECT5' не имеет активных членов.
затем я запускаю kafka- консольный потребитель использует ту же новую группу потребителей (GROUP_NAME_CONNECT5). Он потребляет все сообщения в Topi c.
.. / bin / kafka-consumer-groups. sh - bootstrap -server xxx --command-config generator.properties - group GROUP_NAME_CONNECT5 - опишите предупреждение виртуальной машины 64-битного сервера OpenJDK: если ожидается, что число процессоров увеличится с одного, то вам следует соответствующим образом настроить число параллельных потоков G C, используя -XX: ParallelGCThreads = N группа потребителей 'GROUP_NAME_CONNECT5 'не имеет активных членов.
TOPI C Идентификатор потребителя LAG-END-OFFSET LAG-ENS-OFFSET ОТДЕЛЕНИЕ ТЕКУЩЕГО РАЗМЕРА ХОЗЯЙСТВЕННЫЙ КЛИЕНТ-ID che_silo_cnsld_rpt_mthly 1 112644 112644 0 - - che_silo_cnsld_rilo_t_0_30_30 --_99_30_m_99 0 95715 95715 0 - - - che_silo_cnsld_rpt_mthly 3 106932 106932 0 - - - che_silo_cnsld_rpt_mthly 4 112588 112588 0 - - - che_silo_cnsld_rpt_mthly 7 122047 122047 0 - - - che_silo_cnsld_rpt_mthly 2 129940 129940 0 - - - che_silo_cnsld_rpt_mthly 6 115050 115050 0 - - - * одна тысяча тридцать шесть *
Затем я создаю несколько новых сообщений в топи c и запустите python -потребителя ... и на этот раз python -потребителя успешно запустится!
% 7 | 1579624630.644 | INIT | CLIENT_ID # consumer-1 | [thrd: app]: librdkafka v1.2.1 (0x10201ff) CLIENT_ID # потребитель-1 инициализирован (buildin.features gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, плагины, Gxuth 10, sasl_asl PKGCONFIG УСТАНОВИТЕ GNULD LDS LIBDL ПЛАГИНЫ ZLIB SSL SASL_CYRUS HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, отладка 0x2000)% 7 | 1579624630.648 | SUBRIC | SUBSC [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписаться на новую подписку на 1 тему (присоединиться к состоянию init)% 7 | 1579624630.648 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» выполняет перебалансировку в состоянии init (init-состоянии соединения) без назначения: отписаться% 7 | 1579624631.807 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": откладывает присоединение до тех пор, пока не появятся обновленные метаданные% 7 | 1579624631.808 | REJOIN | CLIENT_ID # consumer-1 | [thrd: main]: Группа "GROUP_NAME_CONNECT5": подписка обновлена после изменения метаданных: присоединяется к группе% 7 | 1579624631.808 | REBALANCE | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5» выполняет перебалансировку в состоянии up (инициализация состояния соединения) без назначения: группа воссоединяется% 7 | 1579624633.644 | JOIN | CLIENT_ID # consumer-1 | [thrd: main]: sasl_ssl: //cilhdkfs0304.sys.cigna.com: 9095/159: Присоединение к группе «GROUP_NAME_CONNECT5» с 1 подпиской topi c (s)% 7 | 1579624636.650 | ASSIGNOR | CLIENT_ID # customer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: запускается присваиватель «range» для 1 участника (ов)% 7 | 1579624636.654 | ASSIGN | CLIENT_ID # consumer-1 | [thrd: main]: Группа «GROUP_NAME_CONNECT5»: новое назначение 8 разделов в состоянии соединения wait-assign-rebalance_cb% 7 | 1579624636.654 | OFFSET | CLIENT_ID # consumer-1 | [thrd: main]: GroupCoordinator / 159: выборка принятых смещений для 8/8 раздела (ов)% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [0] начинает выборку со смещением 91465% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [1] начинает выборку со смещением 107557% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [2] начинает выборку со смещением 123698% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [3] начинает выборку со смещением 102042% 7 | 1579624636. 656 | FETCH | CLIENT_ID # потребитель-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [4] начинает выборку со смещением 106800% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [5] начинает выборку со смещением 117214% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [6] начинает выборку со смещением 109805% 7 | 1579624636.656 | FETCH | CLIENT_ID # consumer-1 | [thrd: main]: раздел che_silo_cnsld_rpt_mthly [7] начинает выборку со смещением 116555