Позвольте мне принять сообщение (в формате JSON) вашей темы Кафки, например:
{
"customer_id":value,
"customer_name":value,
"purchase_id":value,
"purchase_desc":value,
"product_id":value,
"product_name":value
}
И предположим, что название темы my_topic
.
Так что вы можете сделать так:
Для customer
ПОТОК:
CREATE STREAM customer (
-- which 'customer_id' match the JSON key
customer_id BIGINT,
customer_name VARCHAR
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'my_topic'
);
Для purchase
Таблица:
CREATE STREAM purchase (
purchase_id BIGINT,
purchase_desc VARCHAR
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'my_topic'
);
Для product
Таблица:
CREATE STREAM product (
product_id BIGINT,
product_name VARCHAR
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'my_topic'
);