У меня есть несколько проблем с созданием KTable с временным окном в Kafka.
Я хочу создать таблицу, которая будет подсчитывать количество идентификаторов в потоке, как это.
ID (String) | Count (Long)
X | 5
Y | 6
Z | 7
и пр. Я хочу получить таблицу, используя REST-API Kafka, желательно как .json.
Вот мой код на данный момент:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streams = builder.stream(srcTopic);
KTable<Windowed<String>, Long> numCount = streams
.flatMapValues(value -> getID(value))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo"));
Проблема, с которой я сейчас сталкиваюсь, заключается в том, что таблица создается не как <String, Long>
, а как <String, String>
. Это означает, что я не могу получить правильный номер счета, но вместо этого я получаю правильный ключ, но с искаженным счетом. Я пытался выдать его за Long
, используя Long.valueOf(value)
, но безуспешно. Я не знаю, как действовать отсюда. Нужно ли писать KTable в новую тему? Поскольку я хочу, чтобы к таблице можно было обращаться с использованием REST-API kafka, я не думаю, что она нужна, я прав? Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo")
должен сделать запрос доступным как "foo", верно?
KTable создает changelog
-топику, достаточно ли этого для того, чтобы сделать ее запрашиваемой? Или мне нужно создать новую тему для записи?
Я сейчас использую другой KStream для проверки вывода.
KStream<String, String> streamOut = builder.stream(srcTopic);
streamOut.foreach((key, value) -> System.out.println(key + " => " + value));
и выводит:
ID COUNT
2855 => ~
2857 => �
2859 => �
2861 => V(
2863 => �
2874 => �
2877 => J
2880 => �2
2891 => �=
В любом случае, я не хочу использовать KStream для сбора выходных данных, я хочу запросить KTable. Но, как уже упоминалось, я не очень понимаю, как работает запрос ..
Обновление
удалось заставить его работать с
ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("tst", QueryableStoreTypes.windowStore());
long timeFrom = 0;
long timeTo = System.currentTimeMillis(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("x", timeFrom, timeTo);
while (iterator.hasNext()) {
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println(windowTimestamp + ":" + next.value);
}
Заранее большое спасибо,