Вы не сохраняете данные в KSQL.KSQL - это просто движок для запроса и преобразования данных в Kafka.Источником запросов KSQL является раздел (ы) Kafka, а вывод запросов KSQL является либо интерактивным, либо обратно к другой теме kafka.
Если у вас есть данные в ваших темах Kafka - как это звучит, как у вас - тогда в KSQL запустите LIST TOPICS;
:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
asgard.demo.accounts | false | 1 | 1 | 0 | 0
Чтобы просмотреть ваши темы Kafka.Оттуда выберите свою тему, и вы можете запустить PRINT 'my-topic' FROM BEGINNING;
ksql> PRINT 'asgard.demo.accounts' FROM BEGINNING;
Format:AVRO
10/11/18 9:24:45 AM UTC, null, {"account_id": "a42", "first_name": "Robin", "last_name": "Moffatt", "email": "robin@confluent.io", "phone": "+44 123 456 789", "address": "22 Acacia Avenue", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a081", "first_name": "Sidoney", "last_name": "Lafranconi", "email": "slafranconi0@cbc.ca", "phone": "+44 908 687 6649", "address": "40 Kensington Pass", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a135", "first_name": "Mick", "last_name": "Edinburgh", "email": "medinburgh1@eepurl.com", "phone": "+44 301 837 6535", "address": "27 Blackbird Lane", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
, чтобы просмотреть ее содержимое.Нажмите Ctrl-C, чтобы отменить оператор PRINT
и вернуться в командную строку.
Обратите внимание на Format
на выходе оператора PRINT
.Это формат сериализации ваших данных.
Если данные сериализованы в Avro, вы можете выполнить:
CREATE STREAM mydata WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');
Если это в формате JSON, вам также потребуется указать имена столбцов и типы данных
CREATE STREAM mydata (col1 INT, col2 VARCHAR) WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='JSON');
Теперь, когда вы «зарегистрировали» эту тему в KSQL, вы можете просмотреть ее схему с помощью DESCRIBE
:
ksql> DESCRIBE mydata;
Name : MYDATA
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCOUNT_ID | VARCHAR(STRING)
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
EMAIL | VARCHAR(STRING)
PHONE | VARCHAR(STRING)
ADDRESS | VARCHAR(STRING)
COUNTRY | VARCHAR(STRING)
CREATE_TS | VARCHAR(STRING)
UPDATE_TS | VARCHAR(STRING)
MESSAGETOPIC | VARCHAR(STRING)
MESSAGESOURCE | VARCHAR(STRING)
-------------------------------------------
, а затем использовать KSQL для запроса и обработки данных:
ksql> SET 'auto.offset.reset'='earliest';
ksql> SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';
Robin Moffatt | robin@confluent.io
Sidoney Lafranconi | slafranconi0@cbc.ca
Mick Edinburgh | medinburgh1@eepurl.com
Merrill Stroobant | mstroobant2@china.com.cn
Нажмите Ctrl-C, чтобы отменить запрос SELECT
.
KSQL может сохранить это в новой теме Kafka:
CREATE STREAM UK_USERS AS SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';
Если вы снова перечислите свои темы KSQL, вы увидите новую созданную и заполненную:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
asgard.demo.accounts | true | 1 | 1 | 2 | 2
UK_USERS | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
ksql>
Каждое событие, поступающее в исходную тему (asgard.demo.accounts
), читается и фильтруется KSQL и записывается в целевую тему (UK_USERS
) на основе выполненного вами SQL.
Для получения дополнительной информациисм. учебники по синтаксису KSQL и .
Отказ от ответственности: я работаю в Confluent, компании, работающей над проектом с открытым исходным кодом KSQL.