Мое приложение читает topi c из kafka и после его обогащения сохраняет его в другой topi c. StreamsConfig.NUM_STREAM_THREADS_CONFIG
настроен на 8
. У меня есть два брокера и 12 разделов.
Topic: enriched-request PartitionCount: 12 ReplicationFactor: 2 Configs: min.insync.replicas=1,flush.ms=86400000,segment.bytes=1073741824,flush.messages=1073741824,max.message.bytes=1000000,index.interval.bytes=4096,unclean.leader.election.enable=false,retention.bytes=-1,delete.retention.ms=259200000,segment.ms=604800000,segment.index.bytes=10485760
Topic: enriched-request Partition: 0 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 1 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 2 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 3 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 4 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 5 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 6 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 7 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 8 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 9 Leader: 8 Replicas: 8,7 Isr: 7,8
Topic: enriched-request Partition: 10 Leader: 7 Replicas: 7,8 Isr: 7,8
Topic: enriched-request Partition: 11 Leader: 8 Replicas: 8,7 Isr: 7,8
Уже две недели в моей тестовой среде я получаю сообщение журнала: INFO AbstractCoordinator:336 - [Consumer clientId=my-enrichments-client-StreamThread-4-consumer, groupId=my-enrichments] (Re-)joining group
, и это я получаю для всех 8 потоков. Каждые 5 минут это срабатывает, и каждый раз это новый набор из 8 потоков.
В моем kafka.log
я вижу: Member my-enrichments-client-StreamThread-4-consumer-6409090a-9d06-4bc0-8dd0-cd4c8bd28d71 in group my-enrichments has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
снова для всех 8 потоков. Как и выше, каждые 5 минут это новые наборы из 8 потоков, которые он удаляет.
В моей тестовой среде работает только одно приложение. Я пытался ждать 15-20 минут для повторного развертывания, но продолжаю получать ту же ошибку. Кто-нибудь знает, как я могу решить эту проблему, не меняя StreamsConfig.CLIENT_ID_CONFIG
или StreamsConfig.APPLICATION_ID_CONFIG
?
Конфигурация потребителя
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-enrichments");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "my-enrichments-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, my bootstrap sersvers);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
Если это поможет начало происходить примерно через неделю после того, как я увеличил количество потоков с 3
до 8
. Я не знаю, связано ли это с этим.