!! Тестовая среда
Производитель, отправляющий сообщение с ключом 'A' раз в секунду.
Kafka Streams (Окно кода Java) каждые 10 секунд.
Этот код сохраняет количество сообщений в окне в Ktable.
Мне бы хотелосьпросто дать список приложений, чтобы распечатать полученные сообщения.
!!Токовый выход (временной шаг, клавиша, Количество сообщений в акробатическом окне )
(1544079700000L, 'A', 10)
(1544079710000L, 'A', 10)
(1544079720000L, 'A', 10)
...
!!вывод, который я хочу (временной шаг, ключ, Список значений )
(1544079700000L, 'A',, =, [a, b, c, d, e, f, g, h, i, j])
(1544079710000L, 'A',, =, [k, l, m, n, o, p, u, x, y, z])
(1544079710000L, 'A',, =, [a, c, w, d, r, f, a, s, w, d])
...
!!код
package io.github.timothyrenner.kstreamex.tumblingwindow;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
import java.util.Random;
/** Demonstrates tumbling windows.
*
* @author Timothy Renner
*/
public class TumblingWindowKafkaStream {
/** Runs the streams program, writing to the "long-counts-all" topic.
*
* @param args Not used.
*/
public static void main(String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"tumbling-window-kafka-streams");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9090,localhost:9091,localhost:9092");
config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<byte[], byte[]> longs = builder.stream(
Serdes.ByteArray(), Serdes.ByteArray(), "t2");
// The tumbling windows will clear every ten seconds.
KTable<Windowed<byte[]>, Long> longCounts =
longs.groupByKey()
.count(TimeWindows.of(10000L)
.until(10000L),
"t2-counts");
// Write to topics.
longCounts.toStream((k,v) -> k.key())
.to(Serdes.ByteArray(),
Serdes.Long(),
"t2-counts-all");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
} // Close main.
} // Close TumblingWindowKafkaStream.