Надгробие Кафки с преобразованием DSL KStream в KTable - PullRequest
1 голос
/ 06 марта 2020

У меня есть KStream<String, X>, который я, по сути, хочу преобразовать в KTable<String, Y>

. Единственный способ, которым я мог бы добиться этого, используя DSL, - это карта, группа затем уменьшается.

val stream: KStream<String, X> = ...
val table: KTable<String, Y> = stream
  .mapValues({ value -> toYOrNull(value)})
  .groupByKey(Grouped.with(Serdes.String(), ySerde))
  .reduce(
    {old: Y?, updated: Y? -> updated},
    Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-store")
      .withKeySerde(Serdes.String()
      .withValueSerde(ySerde)
  )

Я ожидаю, что это обработает случай, когда значение updated в reduce равно null, однако, когда я проверяю хранилище с помощью TopologyTestDriver, оно все еще имеет старую версию. Что я делаю не так?

Это мой тест:

@Test
fun shouldDeleteFromTableWhenNull() {
  val store = testDriver.getKeyValueStore<String, Y?>("y-store")
  store.put("key", Y())

  inputTopic.pipeInput("key", anXThatMapsToANullY)

  assertThat(store.get("key")).isNull() // Fails as the old entry is still there
}

Ответы [ 2 ]

1 голос
/ 10 марта 2020

В следующей версии Apache Kafka 2.5 добавлен новый оператор KStream#toTable() для решения этого варианта использования (ср. https://issues.apache.org/jira/browse/KAFKA-7658)

В более старых версиях вам потребуется использовать ненулевое «суррогатное значение удаления», чтобы избежать отбрасывания записи, и позволить вашей функции уменьшения вернуть null, если она увидит «суррогатное значение удаления».

1 голос
/ 08 марта 2020

Записи со значением null игнорируются.

Ожидаемое поведение в соответствии с документацией: KGroupedStream :: redu (...) Java Do c

Объедините значения записей в этом потоке по сгруппированному ключу. Записи с нулевым ключом или значением игнорируются

...