Я пытаюсь построить конвейер данных от MySql до Ksql .
Сценарий использования: источником данных является MySql.Я создал таблицу в MySql.
Я использую
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
для запуска автономного соединителя.И он работает нормально.
Я запускаю получателя с именем темы, т.е.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning
, когда я вставляю данные в таблицу MySQL, я получаю результат и в получателе. Я создал KSQL Stream как будет с тем же именем темы .Я ожидаю того же результата и в моем Kstream, но я не получаю никакого результата, когда делаю
select * from <streamName>
Конфигурация соединителя--source-quickstart-mysql.properties
name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera
#comment=Which table(s) to include
table.whitelist=ftest
mode=incrementing
incrementing.column.name=id
topic.prefix=ftopic
Пример данных
1.) Создать базу данных:
CREATE DATABASE testDB;
2.) Использовать базу данных:
USE testDB;
3.) Создать таблицу:
CREATE TABLE products (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
4.) Вставить данные в таблицу:
INSERT INTO products(id,name,description,weight)
VALUES (103,'car','Small car',20);
1.) Создать поток:
CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH \
(kafka_topic='proproducts', value_format='DELIMITED');
2.) Выбрать запрос:
Select * from pro_original;
Ожидаемый вывод
- Потребитель
получает данные, которые вставляются в таблицу MySQL.
Здесь я получаю данные в MySQL.
Ksql Заполняются данные In-Stream, которые вставляются в таблицу Mysql и отражаются в теме Кафки.
Я не являюсьполучение ожидаемого результата в ksql
Помогите мне для этого конвейера данных.