Я получаю данные в источник c с ключами и значениями в схеме AVRO. У меня есть еще 2 потока в конвейере ниже по потоку, прежде чем я отправлю данные в MySQL. Но поскольку я использую функции из K SQL, ключ из источника не отображается в нисходящие потоки.
Ключи составного типа. Есть ли способ сделать некоторые преобразования значений ключей и сохранить KEYS в нисходящих потоках?
Я пробовал несколько способов и знаю, что при использовании оператора SELECT AS во время оператора CREATE STREAM ключ KEY останется неизменным. но любые преобразования удаляют KEY из потоков.
Пример:
-- source stream
CREATE STREAM TEST_1
(COL1 STRING, COL2 STRING, COL3 STRING)
WITH (KAFKA_TOPIC='TEST_1', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO');
-- Second stream
CREATE STREAM TEST_2
WITH (KAFKA_TOPIC='TEST_2', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO') AS
SELECT CLEAR(COL1) AS COL1, CLEAR(COL2) AS COL2, CLEAR(COL3) AS COL3
FROM TEST_1;
Здесь TEST_1 взят из базы данных. Он имеет составные ключи (COL1, COL2).
Но когда я создаю поток TEST_2, составные ключи не отражаются в TEST_1. Мне нужны ключи для записей надгробий для операции удаления в MySQL. С клавишами производятся некоторые операции по чистке.
Я использую для этого конфлюэнт Кафки. Операции вставки и обновления не вызывают никаких проблем, поскольку у меня было значение SMT для ключа в соединителях приемника.
Здесь CLEAR () - это настраиваемый UDF. Есть ли способ добиться этого?