KSQL создает таблицу из потока для последних данных - PullRequest
0 голосов
/ 06 октября 2019

У меня есть тема под названием клиенты, и я создал для нее поток

CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');

Мой продюсер для темы customers создает ключ Integer и значение json. Но когда я вижу, что ключ строки устанавливается в какое-то двоичное значение

ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}

Теперь, если я создаю таблицу, это приводит к одной строке (может быть, потому что ключ строки такой же ??)

CREATE TABLE customers (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');

После поиска в Интернете я наткнулся на эту статью https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key и создал новый поток, перераспределив его по ключу

CREATE STREAM customers_stream2 AS \
 SELECT * FROM customers_stream \
 PARTITION BY customerId;

Итак, как мне создать таблицу споследние значения данных клиентов?

создание таблицы из потока приводит к ошибке

CREATE TABLE customers_2_table_active AS
  SELECT CUSTOMERID,ISACTIVE
  FROM customers_stream2;

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

Мне нужно последнее значение различных строк, такчто другой микросервис может запросить новую таблицу.

Заранее спасибо

1 Ответ

1 голос
/ 07 октября 2019

Повторный ввод, кажется, правильный подход, однако вы не можете преобразовать STREAM в TABLE напрямую.

Обратите внимание, что ваш повторно введенный поток customers_stream2 записан в соответствующую тему. Следовательно, вы должны иметь возможность создать новую TABLE из темы потока, чтобы получить последнее значение для каждого ключа.

...