События, которые должны генерироваться KTable - PullRequest
2 голосов
/ 15 апреля 2019

Я пытаюсь протестировать топологию, которая как последний узел имеет таблицу KTable.В моем тесте используется полноценный Kafka Cluster (через изображения Docker), поэтому я не использую TopologyTestDriver.

В моей топологии вводятся значения типа ключ-значение String -> Customerи выход String -> CustomerMapped.Серды, схемы и интеграция с Реестром схем работают как положено.

Я использую Scala, Kafka 2.2.0, Confluent Platform 5.2.1 и kafka-streams-scala.Моя топология, максимально упрощенная, выглядит примерно так:

val otherBuilder = new StreamsBuilder()

otherBuilder
     .table[String,Customer](source)
     .mapValues(c => CustomerMapped(c.surname, c.age))
     .toStream.to(target)   

(все неявные serdes, Produced, Consumed и т. Д. Являются значениями по умолчанию и найдены правильно)

Мой тест состоит в отправке нескольких записей (data) по теме source синхронно и без пауз и чтении из темы target, я сравниваю результаты с expected:

val data: Seq[(String, Customer)] = Vector(
   "key1" -> Customer(0, "Obsolete", "To be overridden", 0),
   "key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
   "key1" -> Customer(1, "Billy", "The Man", 32),
   "key2" -> Customer(2, "Tommy", "The Guy", 31),
   "key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
   "key1" -> CustomerMapped("The Man", 32),
   "key2" -> CustomerMapped("The Guy", 31),
   "key3" -> CustomerMapped("The Lady", 40)
)

Я создаю приложение Kafka Stream, устанавливая между другими настройками следующие два:

p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)

Поэтому я ожидаю, что KTable будет использовать кеширование с интервалом в 5 секунд между коммитами и размером кеша50 МБ (более чем достаточно для моего сценария).

Моя проблема в том, что результаты, которые я прочитал в теме target, всегда содержат несколько записей для key1.Я бы не ожидал, что для записей с Obsolete и `Obsolete1 генерируется событие.Фактический результат:

Vector(
    "key1" -> CustomerMapped("To be overridden", 0),
    "key1" -> CustomerMapped("To be overridden2", 0),
    "key1" -> CustomerMapped("The Man", 32),
    "key2" -> CustomerMapped("The Guy", 31),
    "key3" -> CustomerMapped("The Lady", 40)
)

И еще одна заключительная вещь: этот тест работал, как и ожидалось, до тех пор, пока я не обновил Kafka с 2.1.0 до 2.2.0.Я подтвердил это понижение моей заявки снова.

Я в замешательстве, кто-нибудь может указать, изменилось ли что-то в поведении KTables в версиях 2.2.x?Или, может быть, теперь есть новые настройки, которые я должен установить, чтобы контролировать передачу событий?

1 Ответ

4 голосов
/ 16 апреля 2019

В Kafka 2.2 была введена оптимизация для сокращения использования ресурсов Kafka Streams.KTable не обязательно материализуется, если оно не требуется для вычисления.Это верно для вашего случая, потому что mapValues() может быть вычислено на лету.Поскольку KTable не материализован, кэш-память отсутствует и, следовательно, каждая входная запись создает одну выходную запись.

Сравнение: https://issues.apache.org/jira/browse/KAFKA-6036

Если вы хотите применить материализацию KTable,Вы можете передать Materilized.as("someStoreName") в StreamsBuilder#table() метод.

...