Как интегрировать данные таблиц MySql в поток Ksql или таблицы? - PullRequest
0 голосов
/ 23 октября 2018

Я пытаюсь построить конвейер данных от MySql до Ksql .

Сценарий использования: источником данных является MySql.Я создал таблицу в MySql.

Я использую

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties  ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties 

для запуска автономного соединителя.И он работает нормально.

Я запускаю получателя с именем темы, т.е.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning

, когда я вставляю данные в таблицу MySQL, я получаю результат и в получателе. Я создал KSQL Stream как будет с тем же именем темы .Я ожидаю того же результата и в моем Kstream, но я не получаю никакого результата, когда делаю

select * from <streamName>

Конфигурация соединителя--source-quickstart-mysql.properties

    name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera

#comment=Which table(s) to include
table.whitelist=ftest

mode=incrementing
incrementing.column.name=id

topic.prefix=ftopic

Пример данных

  • MySql

1.) Создать базу данных:

CREATE DATABASE testDB;

2.) Использовать базу данных:

USE testDB;

3.) Создать таблицу:

    CREATE TABLE products (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512),
  weight FLOAT
);

4.) Вставить данные в таблицу:

    INSERT INTO products(id,name,description,weight)
  VALUES (103,'car','Small car',20);
  • KSQL

1.) Создать поток:

CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');

2.) Выбрать запрос:

Select * from pro_original;

Ожидаемый вывод

  1. Потребитель

получает данные, которые вставляются в таблицу MySQL.

Здесь я получаю данные в MySQL.

Ksql

Заполняются данные In-Stream, которые вставляются в таблицу Mysql и отражаются в теме Кафки.

Я не являюсьполучение ожидаемого результата в ksql

Помогите мне для этого конвейера данных.

1 Ответ

0 голосов
/ 23 октября 2018

Ваши данные в формате AVRO, но в VALUE_FORMAT вместо AVRO вы определили DELIMITED.Важно указать KSQL формат значений, которые хранятся в теме.Следующие действия помогут вам:

CREATE STREAM pro_original_v2 \ 
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='AVRO');

Данные, вставленные в тему kafka после выполнения

SELECT * FROM pro_original_v2;

, теперь должны отображаться в окне консоли ksql.

Вы можете посмотреть некоторые примеры Avro в KSQL здесь .

...