Я пытаюсь рассчитать окно с примером количества слов. Он работает нормально, за исключением того, что вывод частично не читается.
Код:
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
TimeWindows window = TimeWindows.of(TimeUnit.MINUTES.toMillis(1)).advanceBy(TimeUnit.MINUTES.toMillis(1));
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
KTable<Windowed<String>, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.windowedBy(window)
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("streams-plaintext-output", Produced.with(windowedSerde, Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Выход:
kafka c[?? 1
yaya c[?? 1
kafka c[?? 2
Я думаю, что нечитаемая часть может быть продолжительностью окна.
Что я могу сделать, чтобы сделать его читаемым?
EDIT:
Пытался использовать windowedSerde для печати вывода:
KStream<Windowed<String>, Long> output = builder.stream("streams-plaintext-output");
output.print(windowedSerde, Serdes.Long());
Это все еще не работает.