Как распечатать TimeWindowedKStream и KTable в потоках Kafka? - PullRequest
0 голосов
/ 07 августа 2020

У нас есть процесс Kafka, который принимает topi c в качестве входных данных и записывает временное окно в выходные topi c .. используется следующий код. Я хотел бы распечатать TimeWindowedKStream (groupedStream) и KTable (aggregatedTable) и посмотреть результат для некоторых целей отладки ..

    String intopic = input_topic;
    Long window = 60;
    String outtopic = output_topic;
    
    final Serde<String> stringSerde = Serdes.String();

    Properties property = new Properties();
    property.put("bootstrap.servers", "127.0.0.1:9092");
    property.put("group.id", "test-consumer-group");
    property.put("application.id", "sliding-window-min-bar");
    property.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
    property.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());

    Duration windowSizeMs = Duration.ofMinutes(window);

    StreamsBuilder builder = new StreamsBuilder();
    
    System.out.println(intopic);

    KStream<String, String> equitybar = builder.stream(intopic, Consumed.with(stringSerde, stringSerde));
    
    System.out.println(equitybar);
    
    equitybar.print(Printed.toSysOut());

    // convert string of csv to a double on the mean value
    KStream<String, String> transformedbar = equitybar
            .map((key, value) -> KeyValue.pair(key, value.substring(1,value.length()-2).split(",")[2]));

    System.out.println(transformedbar);

    transformedbar.print(Printed.toSysOut());
    
    // group by equity and sliding window
    
    System.out.println(windowSizeMs);
    System.out.println(TimeWindows.of(windowSizeMs).advanceBy(advanceMs));
    
    TimeWindowedKStream<String, String> groupedStream = transformedbar.groupByKey().windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs));

    System.out.println(groupedStream);
    KTable<Windowed<String>, String> aggregatedTable = groupedStream.aggregate(
            () -> "|",
            (aggKey, newValue, aggValue) -> aggValue + newValue.trim() + "|")      ; 
    

Я попытался распечатать его с помощью команды печати, которая используется для потоков Kafka - groupedStream.print (Printed.toSysOut ()); - но вроде не работает.

Спасибо.

1 Ответ

0 голосов
/ 18 августа 2020

KGroupedStream и TimeWindowedKStream - это «просто» вспомогательные классы, позволяющие DSL представлять плавный API-интерфейс для цепочки операторов без слишком большого количества перегрузок в одном классе.

В DSL есть только две основные абстракции, KStream и KTable, которые являются фактическими первоклассными контейнерами данных. Таким образом, сделать то, что вы хотите, невозможно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...