Можно ли преобразовать таблицу в поток в K SQL? - PullRequest
0 голосов
/ 13 января 2020

Я работаю в кафке с K SQL. Я хотел бы узнать последнюю строку в течение 5 минут в другом DEV_NAME (ROWKEY). Поэтому я создал потоковую и агрегированную таблицу для дальнейшего объединения.

Ниже K SQL я создал таблицу для определения последней строки в течение 5 минут для различных DEV_NAME

CREATE TABLE TESTING_TABLE  AS
SELECT  ROWKEY AS DEV_NAME, max(ROWTIME) as LAST_TIME 
    FROM TESTING_STREAM WINDOW TUMBLING (SIZE 5 MINUTES)
    GROUP BY ROWKEY;

Затем я хотел бы объединиться:

CREATE STREAM TESTING_S_2 AS 
  SELECT *
    FROM TESTING_S  S
        INNER JOIN TESTING_T T
        ON    S.ROWKEY = T.ROWKEY
    WHERE  
    S.ROWTIME = T.LAST_TIME;

Однако произошла ошибка:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to the actual key type (key type: org.apache.kafka.connect.data.Struct). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Должно быть, функция WINDOW TUMBLING изменила мой стиль ROWKEY

(e.g. DEV_NAME_11508 -> DEV_NAME_11508 : Window{start=157888092000 end=-}       

Поэтому, не устанавливая Serdes, я мог бы преобразовать из таблицы в поток и установить PARTITION BY DEV_NAME?

...