Как сделать так, чтобы KTable испускал только последние обновления? - PullRequest
0 голосов
/ 20 февраля 2020

My KTable испускается при каждом обновлении, а не только в последних обновлениях.

Пожалуйста, см. Код ниже (в Scala):

object SimpleTable extends App {
  val topic = "simple-table"

  val prodProps = new Properties()
  prodProps.put("bootstrap.servers", "localhost:9092")
  prodProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  prodProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  prodProps.put("acks", "1")
  prodProps.put("retries", "3")

  val producer = new KafkaProducer[String, String](prodProps)

  producer.send(new ProducerRecord[String, String](topic, "key1", "value1"))
  producer.send(new ProducerRecord[String, String](topic, "key2", "value2"))
  producer.send(new ProducerRecord[String, String](topic, "key3", "value3"))
  producer.send(new ProducerRecord[String, String](topic, "key1", "value11"))
  producer.send(new ProducerRecord[String, String](topic, "key2", "value22"))
  producer.send(new ProducerRecord[String, String](topic, "key3", "value33"))

  producer.close()


  val streamProps = new Properties()
  streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-table-app1")
  streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  //streamProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group11")
  //streamProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "client11")
  //streamProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  //streamProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "18000")
  //streamProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "18000")
  //streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10485760")
  //streamProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
  //streamProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000")
  //streamProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1)
  //streamProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[WallclockTimestampExtractor])

  import org.apache.kafka.streams.scala.Serdes._
  implicit val consumeSerdes: Consumed[String, String] = Consumed.`with`[String, String]
  val builder = new StreamsBuilder()

  val simpleTable: KTable[String, String] = builder.table[String, String](topic)
  simpleTable.toStream.print(Printed.toSysOut[String, String].withLabel("simple-table"))


  val streams = new KafkaStreams(builder.build(), streamProps)
  streams.start()
  Thread.sleep(10000)
  streams.close()
}

Это приложение отображает это:

[simple-table]: key1, value1
[simple-table]: key2, value2
[simple-table]: key3, value3
[simple-table]: key1, value11
[simple-table]: key2, value22
[simple-table]: key3, value33

У меня должны быть только последние 3 строки. Пожалуйста помоги.

ОБНОВЛЕНИЕ

Согласно приведенному ниже решению, все работает хорошо, когда я создаю KTable следующим образом:

val simpleTable: KTable[String, String] =
    builder.table[String, String](topic, Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("simple-table-store"))

1 Ответ

0 голосов
/ 20 февраля 2020

Я получил ответ от этого Вопрос .

Код, используемый для работы со старой версией kafka-streams, старше 2.2.

CopyPasted

В Kafka 2.2 была введена оптимизация для сокращения использования ресурсов Kafka Streams. KTable не обязательно материализуется, если он не требуется для вычислений. Это верно для вашего случая, потому что mapValues ​​() может быть вычислена на лету. Поскольку таблица KTable не материализована, кэш-память отсутствует, и поэтому каждая входная запись создает одну выходную запись.

Сравните: https://issues.apache.org/jira/browse/KAFKA-6036

Если вы хотите применить KTable материализацию можно передать в Materilized.as ("someStoreName") в метод StreamsBuilder # table ().

...