Метод metadataForKey в Kafka Streams дает неверную информацию для нескольких экземпляров приложения, подключенных к одной группе - PullRequest
0 голосов
/ 03 августа 2020

Я реализую механизм, который предоставляет некоторую информацию о метаданных, запрашивая их локально из магазина или запрашивая удаленный экземпляр Kafka Streams.

Я использую Scala и kafka-streams- scala библиотеку версии 2.4.1

Я постараюсь дать вам небольшой простой пример того, что я делаю

  1. Я запускаю кластер Kafka, который создает 1 тестовый топи c с 2 раздела.
  2. Также я запускаю 1 экземпляр Kafka Streams, как я упоминал выше, который реализует механизм запроса локальных или удаленных метаданных из хранилища и хранит всю информацию о разделах, пока другие экземпляры не будут подключены к той же группе .
  3. I pu sh некоторые записи в тестовые топи c
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));
Я запускаю второй экземпляр Kafka Streams, который подключается к той же группе, я вижу процесс перебалансировки и переназначения разделов, и, насколько я хорошо понимаю, оба приложения должны делиться между собой после этих разделов, например, приложение 1 Kafka Streams должно работать с разделом 0 и приложение 2 Kafka Streams должно работать с разделом 1 или наоборот после повторной балансировки и переназначения.

Следующий шаг, чтобы убедиться, что потоки Kafka работают таким образом, как я описал в шаге 4 I ' m, запустив следующий код.

val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()

while (it.hasNext) {
  val keyValue: KeyValue[String, String] = it.next();
  println(keyValue)
}

Очень круто, я вижу то, что ожидаю. Kafka Stream, который я запускаю на localhost, содержит раздел 1 после повторной балансировки и переназначения раздела.

KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)

НО когда я запускаю этот небольшой фрагмент кода, я вижу совершенно неожиданный результат с моей точки зрения.

println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

Насколько я понимаю, я должен ожидать чего-то вроде этого

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}

StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

1 Ответ

0 голосов
/ 06 августа 2020

Прежде всего, я хочу заметить, что metadataForKey дает вам некоторую информацию, даже если у вас нет записей в хранилище, и кажется, что эта информация, где размещен ключ, случайна.

Я понял, что проблема связана не с версией, а с сериализаторами.

Я вставил записи в topi c из java с помощью StringSerializer и из scala Я пробовал для запроса метаданных с помощью Serdes.String.serializer () , и он давал мне случайные результаты, которые не соответствовали действительности.

Я создал другой способ отправки данных в topi c с использованием scala с GenericPrimitiveSerdeString сериализатором ключей и тем же сериализатором для metadataForKey и, к моему удивлению, на этот раз сработало, как ожидалось.

Итак, для тех, кто будет использовать metadataForKey обратите внимание на сериализаторы ключей, чтобы этот метод работал правильно

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...