Должно ли TimeWindows быть одинаковым при соединении двух KTable, полученных из TimeWindows - PullRequest
0 голосов
/ 20 июня 2019

Я использую два разных срока хранения для двух разных KTable, и он работает с состояниями RocksDB и темами изменений Kafka.

KTable генерируется из KStream и groupBy, а затем windowedBy.

Я считаю, что при объединении KStream с оконным управлением TimeWindows то же самое.Мне интересно, будет ли польза или недостаток, если параметры TimeWindows будут разными, при соединении двух разных таблиц KTable с помощью TimeWindows?

фрагмента кода:

final KStream<Integer, String> eventStream = builder.stream("events",
                        Consumed.with(Serdes.Integer(), Serdes.String())
                                .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));

final KTable<Windowed<Integer>, String> eventWindowTable = eventStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).until(Duration.ofSeconds(100).toMillis()))
                .reduce((oldValue, newValue) -> newValue);

final KStream<Integer, String> clickStream = builder.stream("clicks",
                Consumed.with(Serdes.Integer(), Serdes.String())
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));

final KTable<Windowed<Integer>, String> clickWindowTable = clickStream.groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).until(Duration.ofSeconds(70).toMillis()))
                .reduce((oldValue, newValue) -> newValue);

final KTable<Windowed<Integer>, String> join = eventWindowTable.leftJoin(clickWindowTable,
                (event, click) -> event + " ; " + click + " ; " + Instant.now()
        );

Сначала я думалобъединение двух разных KTable с разными параметрами TimeWindows не будет работать, потому что объединение основано на TimeWindowedKey, ключе для временного интервала.Но после тестирования это тоже работает.

1 Ответ

0 голосов
/ 25 июня 2019

Объединение выполнено, потому что тип обеих клавиш одинаков: Windowed<Integer>. Соединение, конечно, даст результат, только если ключи одинаковы. Предположим, у вас есть следующие окна (обратите внимание, что для TimeWindows хранится только отметка времени начала окна):

eventWindowTable: <A,0>        <A,60>       

clickWindowTable: <A,0> <A,30> <A,60> <A,90>

В этом случае присоединятся только <A,0> и <A,60>. Следовательно, наличие разных окон влияет на ваш результат, потому что отметка времени начала окна является частью ключа, и некоторые окна никогда не присоединятся (например, <A,30> и <A,90> в нашем примере).

...