Потребительская группа Kafka Stream не показывает смещения - PullRequest
0 голосов
/ 28 сентября 2018

У меня есть два потока приложений kafka.

Я могу видеть там группы потребителей, используя:

>bin/kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092`
streams-distribution-app
streams-collection-app

Я могу видеть смещения дистрибутивного приложения, используя:

>bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-distribution-app
TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                             HOST            CLIENT-ID
o365_user_activity       0          3900            3985            85              streams-distribution-app-StreamThread-1 /127.0.0.1      
o365_cassandra_data_load 0          -               0               -               streams-distribution-app-StreamThread-1 /127.0.0.1      streams-distribution-app

Но я не могу просмотреть смещения дляcollection-app:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app 
Note: This will not show information about old Zookeeper-based consumers.

Приложение работает и обрабатывает журналы.Я не понимаю, в чем может быть причина не описывать информацию о смещении.

ОБНОВЛЕНИЕ:

Через некоторое время (около 20 минут) я наконец могу увидеть смещения,Но есть большое отставание.

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app 
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG     CONSUMER-ID                             HOST        CLIENT-ID
o365_activity_contenturl 0          200             64941           64741   streams-collection-app-StreamThread-1   /127.0.0.1 streams-collection-app

Это конфиги для приложения stream-collection:

props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), Integer.MAX_VALUE);
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), Integer.MAX_VALUE);
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), Integer.MAX_VALUE);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 200);

ОБНОВЛЕНИЕ 2:

Я попытался увеличить конфигурацию num.stream.threads и установить ее 4. И снова перезапустил приложение с более чем 40k записями в исходной теме.

Тем не менее не было никакого вывода при описании группы потребителей потокового приложения.Примерно через 10-15 минут я вижу следующие смещения:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID                            HOST            CLIENT-ID
o365_activity_contenturl 0          200             11274           11074  streams-collection-app-StreamThread-1  /127.0.0.1      streams-app-StreamThread-1
o365_activity_contenturl 1          200             11384           11184  streams-collection-app-StreamThread-2  /127.0.0.1      streams-app-StreamThread-2
o365_activity_contenturl 3          200             11343           11143  streams-collection-app-StreamThread-4  /127.0.0.1      streams-app-StreamThread-4
o365_activity_contenturl 2          200             11471           11271  streams-collection-app-StreamThread-3  /127.0.0.1      streams-app-StreamThread-3
...