У меня есть тема под названием клиенты, и я создал для нее поток
CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');
Мой продюсер для темы customers
создает ключ Integer и значение json. Но когда я вижу, что ключ строки устанавливается в какое-то двоичное значение
ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}
Теперь, если я создаю таблицу, это приводит к одной строке (может быть, потому что ключ строки такой же ??)
CREATE TABLE customers (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');
После поиска в Интернете я наткнулся на эту статью https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key и создал новый поток, перераспределив его по ключу
CREATE STREAM customers_stream2 AS \
SELECT * FROM customers_stream \
PARTITION BY customerId;
Итак, как мне создать таблицу споследние значения данных клиентов?
создание таблицы из потока приводит к ошибке
CREATE TABLE customers_2_table_active AS
SELECT CUSTOMERID,ISACTIVE
FROM customers_stream2;
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
Мне нужно последнее значение различных строк, такчто другой микросервис может запросить новую таблицу.
Заранее спасибо