Я реализую механизм, который предоставляет некоторую информацию о метаданных, запрашивая их локально из магазина или запрашивая удаленный экземпляр Kafka Streams.
Я использую Scala и kafka-streams- scala библиотеку версии 2.4.1
Я постараюсь дать вам небольшой простой пример того, что я делаю
- Я запускаю кластер Kafka, который создает 1 тестовый топи c с 2 раздела.
- Также я запускаю 1 экземпляр Kafka Streams, как я упоминал выше, который реализует механизм запроса локальных или удаленных метаданных из хранилища и хранит всю информацию о разделах, пока другие экземпляры не будут подключены к той же группе .
- I pu sh некоторые записи в тестовые топи c
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));
Я запускаю второй экземпляр Kafka Streams, который подключается к той же группе, я вижу процесс перебалансировки и переназначения разделов, и, насколько я хорошо понимаю, оба приложения должны делиться между собой после этих разделов, например, приложение 1 Kafka Streams должно работать с разделом 0 и приложение 2 Kafka Streams должно работать с разделом 1 или наоборот после повторной балансировки и переназначения.
Следующий шаг, чтобы убедиться, что потоки Kafka работают таким образом, как я описал в шаге 4 I ' m, запустив следующий код.
val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()
while (it.hasNext) {
val keyValue: KeyValue[String, String] = it.next();
println(keyValue)
}
Очень круто, я вижу то, что ожидаю. Kafka Stream, который я запускаю на localhost, содержит раздел 1 после повторной балансировки и переназначения раздела.
KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)
НО когда я запускаю этот небольшой фрагмент кода, я вижу совершенно неожиданный результат с моей точки зрения.
println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
Насколько я понимаю, я должен ожидать чего-то вроде этого
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}