Как перебирать ключевые значения таблицы потоков Кафки - PullRequest
0 голосов
/ 12 октября 2018

Я новичок в потоках Kafka, и я попытался перебрать элементы в таблице Streams kafka через keyValueStore:

Идея состоит в том, чтобы создать Ktable (я также пробовал с globalKTable) сKeyValueStore.Затем отдельный поток отвечает за чтение содержимого KeyValueStore, чтобы выполнить итерацию по последнему значению каждого ключа.

      val streamProperties: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getStringList("kafka.bootstrap.servers").toList.mkString(","))
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName)
      p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder()
    import org.apache.kafka.streams.kstream.Materialized
    import org.apache.kafka.streams.state.KeyValueStore


    val globalTable = builder.table("test",
      Materialized
        .as[String, Array[Byte], KeyValueStore[org.apache.kafka.common.utils.Bytes, Array[Byte]]]("TestStore")
        .withCachingDisabled()
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.ByteArray())
    )

    val streams: KafkaStreams = new KafkaStreams(builder.build(), streamProperties)
    streams.start()

    val ex = new ScheduledThreadPoolExecutor(1)
    val task = new Runnable {
      def run() = {
        println("\n\n\n tick \n\n\n")
        try {
          val keyValueStore = streams.store(globalTable.queryableStoreName(), QueryableStoreTypes.keyValueStore())
          keyValueStore.all().toIterator.map { k =>
            print(k.key)
          }
        } catch {
          case _ => println("error")
        }
      }
    }
    val f = ex.scheduleAtFixedRate(task, 1, 10, TimeUnit.SECONDS)
  }
}

В потоке keyValueStore остается пустым, даже когда я создаю элементы по теме "test".

Что-то я пропустил или не понял?

...