KTable с окном выдает неправильный тип - PullRequest
0 голосов
/ 08 мая 2018

У меня есть несколько проблем с созданием KTable с временным окном в Kafka.

Я хочу создать таблицу, которая будет подсчитывать количество идентификаторов в потоке, как это.

ID (String) |  Count (Long)
    X       |       5
    Y       |       6
    Z       |       7

и пр. Я хочу получить таблицу, используя REST-API Kafka, желательно как .json.

Вот мой код на данный момент:

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> streams = builder.stream(srcTopic);

    KTable<Windowed<String>, Long> numCount = streams
            .flatMapValues(value -> getID(value))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo"));

Проблема, с которой я сейчас сталкиваюсь, заключается в том, что таблица создается не как <String, Long>, а как <String, String>. Это означает, что я не могу получить правильный номер счета, но вместо этого я получаю правильный ключ, но с искаженным счетом. Я пытался выдать его за Long, используя Long.valueOf(value), но безуспешно. Я не знаю, как действовать отсюда. Нужно ли писать KTable в новую тему? Поскольку я хочу, чтобы к таблице можно было обращаться с использованием REST-API kafka, я не думаю, что она нужна, я прав? Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo") должен сделать запрос доступным как "foo", верно?

KTable создает changelog -топику, достаточно ли этого для того, чтобы сделать ее запрашиваемой? Или мне нужно создать новую тему для записи?

Я сейчас использую другой KStream для проверки вывода.

KStream<String, String> streamOut = builder.stream(srcTopic);

streamOut.foreach((key, value) -> System.out.println(key + " => " + value));

и выводит:

 ID    COUNT
2855 => ~
2857 => �
2859 => �
2861 => V(
2863 => �
2874 => �
2877 => J
2880 => �2
2891 => �=

В любом случае, я не хочу использовать KStream для сбора выходных данных, я хочу запросить KTable. Но, как уже упоминалось, я не очень понимаю, как работает запрос ..

Обновление

удалось заставить его работать с

    ReadOnlyWindowStore<String, Long> windowStore =
            kafkaStreams.store("tst", QueryableStoreTypes.windowStore());
        long timeFrom = 0;
        long timeTo = System.currentTimeMillis(); // now (in processing-time)
        WindowStoreIterator<Long> iterator = windowStore.fetch("x", timeFrom, timeTo);
        while (iterator.hasNext()) {
          KeyValue<Long, Long> next = iterator.next();
          long windowTimestamp = next.key;
          System.out.println(windowTimestamp + ":" + next.value);
        }

Заранее большое спасибо,

1 Ответ

0 голосов
/ 08 мая 2018

Тип вывода KTable равен <Windowed<String>,String>, поскольку в потоках Kafka несколько окон поддерживаются параллельно, что позволяет обрабатывать неупорядоченные данные. Таким образом, это не случай, когда существует один экземпляр окна, но много экземпляров окна параллельно. (ср. https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows)

Сохранение «старых» окон позволяет обновлять их, когда данные поступают поздно. Обратите внимание, что семантика Kafka Streams основана на времени события.

Вы все еще можете запросить KTable - вам нужно только знать, какое окно вы хотите запросить.

Обновление

JavaDoc описывает, как запросить таблицу: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java#L94-L101

KafkaStreams streams = ... // counting words
Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());

String key = "some-word";
long fromTime = ...;
long toTime = ...;
WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...