Я пытаюсь протестировать топологию, которая как последний узел имеет таблицу KTable.В моем тесте используется полноценный Kafka Cluster (через изображения Docker), поэтому я не использую TopologyTestDriver
.
В моей топологии вводятся значения типа ключ-значение String -> Customer
и выход String -> CustomerMapped
.Серды, схемы и интеграция с Реестром схем работают как положено.
Я использую Scala, Kafka 2.2.0, Confluent Platform 5.2.1 и kafka-streams-scala
.Моя топология, максимально упрощенная, выглядит примерно так:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)
(все неявные serdes, Produced
, Consumed
и т. Д. Являются значениями по умолчанию и найдены правильно)
Мой тест состоит в отправке нескольких записей (data
) по теме source
синхронно и без пауз и чтении из темы target
, я сравниваю результаты с expected
:
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
Я создаю приложение Kafka Stream, устанавливая между другими настройками следующие два:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)
Поэтому я ожидаю, что KTable будет использовать кеширование с интервалом в 5 секунд между коммитами и размером кеша50 МБ (более чем достаточно для моего сценария).
Моя проблема в том, что результаты, которые я прочитал в теме target
, всегда содержат несколько записей для key1
.Я бы не ожидал, что для записей с Obsolete
и `Obsolete1 генерируется событие.Фактический результат:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
И еще одна заключительная вещь: этот тест работал, как и ожидалось, до тех пор, пока я не обновил Kafka с 2.1.0 до 2.2.0.Я подтвердил это понижение моей заявки снова.
Я в замешательстве, кто-нибудь может указать, изменилось ли что-то в поведении KTables в версиях 2.2.x?Или, может быть, теперь есть новые настройки, которые я должен установить, чтобы контролировать передачу событий?