Как материализовать * оконный * KTable в тему Кафки - PullRequest
0 голосов
/ 04 мая 2018

Я пишу приложение KafkaStreams, которое принимает строковые значения из темы, где я хотел бы вывести конкатенацию значений определенного ключа за последние 5 минут, обновляя каждую минуту в другую (уплотненную) тему Kafka. У меня такое чувство, что я почти на месте, но мне пока не удалось. Я протестировал с простым:

grouped_transactions.toStream().foreach((key, value) -> {
    System.out.println(key.window().toString()+ key.key() + "    "+ value);
});

Что дает мне что-то вроде того, что вы видите ниже (я отфильтровал по ключу темы источника 00909, чтобы упростить отладку) То, что я не хочу, это для всех разных Windows с одним и тем же составным значением, я просто хочу свою конкатенацию расширяющейся строки.

Window{start=1525437120000, end=1525437420000}00909    "ABC",-554.53
Window{start=1525437360000, end=1525437660000}00909    "ABC",-554.53
Window{start=1525437240000, end=1525437540000}00909    "ABC",-554.53
Window{start=1525437300000, end=1525437600000}00909    "ABC",-554.53
Window{start=1525437180000, end=1525437480000}00909    "ABC",-554.53
Window{start=1525437120000, end=1525437420000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437180000, end=1525437480000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437240000, end=1525437540000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437300000, end=1525437600000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437360000, end=1525437660000}00909    "ABC",-554.53;"ABC",646.03

Ниже приведен весь код. Кто-нибудь знает, как это сделать? Заранее спасибо!

Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

StreamsBuilder builder = new StreamsBuilder();

long windowSizeMs = TimeUnit.MINUTES.toMillis(5); // 5 * 60 * 1000L
long advanceMs =    TimeUnit.MINUTES.toMillis(1); // 1 * 60 * 1000L
TimeWindows window = TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
KTable<Windowed<String>, String> grouped_transactions = source
        .filter((k,v)->k.equals("00909"))
        .groupByKey()
        .windowedBy(window)
        .reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("grouped_transactions_5_1"));

// THIS FAILS on runtime with
// java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed  
// cannot be cast to java.lang.String
grouped_transactions.toStream().to(GROUPEDTRANSACTIONS);


final KafkaStreams streams = new KafkaStreams(builder.build(), props);

1 Ответ

0 голосов
/ 05 мая 2018

Чего я не хочу, так это всех разных Windows с одним и тем же составным значением, я просто хочу свою конкатенацию расширяющейся строки.

Поскольку вы указываете перекрывающиеся окна, одна запись может содержаться в нескольких экземплярах окна. Возможно, вы хотите указать непересекающиеся окна, то есть окна с size == advance.

...