нечитаемый вывод счетчика потока потока kafka - PullRequest
0 голосов
/ 01 мая 2018

Я пытаюсь рассчитать окно с примером количества слов. Он работает нормально, за исключением того, что вывод частично не читается.

Код:

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();
    WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
    WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
    Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

    TimeWindows window = TimeWindows.of(TimeUnit.MINUTES.toMillis(1)).advanceBy(TimeUnit.MINUTES.toMillis(1));

    KStream<String, String> textLines = builder.stream("streams-plaintext-input");
    KTable<Windowed<String>, Long> wordCounts = textLines
        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
        .groupBy((key, word) -> word)
        .windowedBy(window)
        .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store"));
    wordCounts.toStream().to("streams-plaintext-output", Produced.with(windowedSerde, Serdes.Long()));

    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.start();

Выход:

kafka c[??   1
yaya c[??    1
kafka c[??   2

Я думаю, что нечитаемая часть может быть продолжительностью окна. Что я могу сделать, чтобы сделать его читаемым?

EDIT:

Пытался использовать windowedSerde для печати вывода:

    KStream<Windowed<String>, Long> output = builder.stream("streams-plaintext-output");
    output.print(windowedSerde, Serdes.Long());

Это все еще не работает.

1 Ответ

0 голосов
/ 01 мая 2018

При чтении из темы необходимо использовать десериализатор, подходящий для сериализатора, который использовался для создания темы. В этом случае вам нужно использовать windowDeserializer, который вы уже строите следующим образом:

WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
...