В кафке как трансформировать тему в таблицу?Мне нужно скопировать удаленную таблицу - PullRequest
0 голосов
/ 11 октября 2018

Я настроил соединение с базой данных и всю передачу данных по теме, потому что, когда я запускаю потребителя, он возвращает данные

Как я могу преобразовать эту тему в таблицу и сохранить данные внутри KSQL?

Большое спасибо

1 Ответ

0 голосов
/ 11 октября 2018

Вы не сохраняете данные в KSQL.KSQL - это просто движок для запроса и преобразования данных в Kafka.Источником запросов KSQL является раздел (ы) Kafka, а вывод запросов KSQL является либо интерактивным, либо обратно к другой теме kafka.

Если у вас есть данные в ваших темах Kafka - как это звучит, как у вас - тогда в KSQL запустите LIST TOPICS;:

ksql> LIST TOPICS;    

 Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
 _confluent-metrics          | false      | 12         | 1                  | 0         | 0
 asgard.demo.accounts        | false      | 1          | 1                  | 0         | 0

Чтобы просмотреть ваши темы Kafka.Оттуда выберите свою тему, и вы можете запустить PRINT 'my-topic' FROM BEGINNING;

ksql> PRINT 'asgard.demo.accounts' FROM BEGINNING;
Format:AVRO
10/11/18 9:24:45 AM UTC, null, {"account_id": "a42", "first_name": "Robin", "last_name": "Moffatt", "email": "robin@confluent.io", "phone": "+44 123 456 789", "address": "22 Acacia Avenue", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a081", "first_name": "Sidoney", "last_name": "Lafranconi", "email": "slafranconi0@cbc.ca", "phone": "+44 908 687 6649", "address": "40 Kensington Pass", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a135", "first_name": "Mick", "last_name": "Edinburgh", "email": "medinburgh1@eepurl.com", "phone": "+44 301 837 6535", "address": "27 Blackbird Lane", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}

, чтобы просмотреть ее содержимое.Нажмите Ctrl-C, чтобы отменить оператор PRINT и вернуться в командную строку.

Обратите внимание на Format на выходе оператора PRINT.Это формат сериализации ваших данных.

Если данные сериализованы в Avro, вы можете выполнить:

CREATE STREAM mydata WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');

Если это в формате JSON, вам также потребуется указать имена столбцов и типы данных

CREATE STREAM mydata (col1 INT, col2 VARCHAR) WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='JSON');

Теперь, когда вы «зарегистрировали» эту тему в KSQL, вы можете просмотреть ее схему с помощью DESCRIBE:

ksql> DESCRIBE mydata;

Name                 : MYDATA
 Field         | Type
-------------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 ACCOUNT_ID    | VARCHAR(STRING)
 FIRST_NAME    | VARCHAR(STRING)
 LAST_NAME     | VARCHAR(STRING)
 EMAIL         | VARCHAR(STRING)
 PHONE         | VARCHAR(STRING)
 ADDRESS       | VARCHAR(STRING)
 COUNTRY       | VARCHAR(STRING)
 CREATE_TS     | VARCHAR(STRING)
 UPDATE_TS     | VARCHAR(STRING)
 MESSAGETOPIC  | VARCHAR(STRING)
 MESSAGESOURCE | VARCHAR(STRING)
-------------------------------------------

, а затем использовать KSQL для запроса и обработки данных:

ksql> SET 'auto.offset.reset'='earliest';

ksql> SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';

Robin Moffatt | robin@confluent.io
Sidoney Lafranconi | slafranconi0@cbc.ca
Mick Edinburgh | medinburgh1@eepurl.com
Merrill Stroobant | mstroobant2@china.com.cn

Нажмите Ctrl-C, чтобы отменить запрос SELECT.

KSQL может сохранить это в новой теме Kafka:

CREATE STREAM UK_USERS AS SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';

Если вы снова перечислите свои темы KSQL, вы увидите новую созданную и заполненную:

ksql> LIST TOPICS;

 Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
 _confluent-metrics          | false      | 12         | 1                  | 0         | 0
 asgard.demo.accounts        | true       | 1          | 1                  | 2         | 2
 UK_USERS                    | true       | 4          | 1                  | 0         | 0
---------------------------------------------------------------------------------------------------------
ksql>

Каждое событие, поступающее в исходную тему (asgard.demo.accounts), читается и фильтруется KSQL и записывается в целевую тему (UK_USERS) на основе выполненного вами SQL.

Для получения дополнительной информациисм. учебники по синтаксису KSQL и .

Отказ от ответственности: я работаю в Confluent, компании, работающей над проектом с открытым исходным кодом KSQL.

...