У меня есть служба Kafka Stream, которая, по-видимому, ни по какой причине не начинает иметь специфическую ошибку даже в локальных режимах работы.
Сервис My Stream выполняет несколько операций:
stream
.map(doSomething)
.filter(filterSomething)
.groupBy(groupMyStuffs)
.aggregate(Map.empty[String, Object])(aggregation)
.mapValues((k, v) => parseAggResults(k, v))
.toStream
.flatMap((_, v) => v)
.to(outputTopic)
Хорошо, выполняя мои тесты, я обнаруживаю, что мой сервис ломался после mapValues
, когда он вызывал функцию toStream
, которая запишет мои данные в новую тему, созданную потоками Kafka, конвертирующими KTable в поток Kafka.
Я проверил темы, созданные KStreams, и темы есть:
myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog
myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition
Тогда я попытался пойти немного дальше в этой проблеме. Я начал отлаживать классы клиентов Kafka, чтобы определить причину появления этой ошибки, и обнаружил, что в этой строке кода:
https://github.com/apache/kafka/blob/2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L966
Мне удалось обнаружить, что response
имеет три входа как topicMetadata
, и я не знаю, что это за третий:
(type=TopicMetadata, error=NONE, topic=myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition, isInternal=false, partitionMetadata=[(type=PartitionMetadata, error=NONE, partition=0, leader=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), replicas=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), isr=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), offlineReplicas=)])
(type=TopicMetadata, error=NONE, topic=myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog, isInternal=false, partitionMetadata=[(type=PartitionMetadata, error=NONE, partition=0, leader=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), replicas=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), isr=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), offlineReplicas=)])
(type=TopicMetadata, error=INVALID_TOPIC_EXCEPTION, topic=, isInternal=false, partitionMetadata=[])
И просто чтобы убедиться, что все в порядке, вот какая у меня конфигурация:
logger.info(s"Loading Kafka configurations")
logger.info(s"Kafka Connection with: ${getEnvVar("KAFKA_PROTOBUF_CONN")}")
logger.info(s"Consumer Name: ${getEnvVar("CONSUMER_STREAM_NAME")}")
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, getEnvVar("CONSUMER_STREAM_NAME"))
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getEnvVar("KAFKA_PROTOBUF_CONN"))
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getEnvVar("AUTO_OFFSET_RESET_CONFIG"))
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass.getName)
settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10.seconds)
settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")
//Added to avoid messages created by old producers
settings.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor")
settings.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "1000")
settings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all")
settings.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), 20.mb)
if (!isLocalRun)
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3")
Моя проблема в том, что наши развертывания работали, и внезапно все начинают иметь эту ошибку:
[kafka-producer-network-thread | myconsumergroup-1d0237ae-6caa-4cbd-aeaa-2154d2303b32-StreamThread-1-producer] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=myconsumergroup-1d0237ae-6caa-4cbd-aeaa-2154d2303b32-StreamThread-1-producer] Error while fetching metadata with correlation id 9 : {=INVALID_TOPIC_EXCEPTION}
Я пытаюсь найти, что не так, в чем проблема с этой конфигурацией или чем-то в этом роде, но безуспешно.
Это начало сбой даже на локальной машине, что может быть причиной?