Я пишу приложение KafkaStreams и устанавливаю Maximum.num.threads как единое целое. У меня три темы и 6,8,8 разделов соответственно. В настоящее время выполняется эта потоковая топология с 4 экземплярами, поэтому 4 потоковых потока работают.
Я получаю INCOMPLETE_SOURCE_TOPIC_METADATA в одной из моих тем кафки. Я нашел ниже код из github, который выдает эту ошибку и пытается понять код
final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} during assignment. Returning error {}.",
topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
return new GroupAssignment(
errorAssignment(clientMetadataMap, topic,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
);
}
}
for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), topic);
}
}
Мои вопросы:
Эта ошибка возникает из-за несоответствия разделов в темах Кафки или TopicsInfo
недоступен в то время (подумайте, что группа Кафки потеряла доступ к топике Кафки c)?
что означает topicsInfo.repartitionSourceTopics
call?