Сохранение KEY всех потоков таким же, как у исходного потока - PullRequest
0 голосов
/ 09 мая 2020

Я получаю данные в источник c с ключами и значениями в схеме AVRO. У меня есть еще 2 потока в конвейере ниже по потоку, прежде чем я отправлю данные в MySQL. Но поскольку я использую функции из K SQL, ключ из источника не отображается в нисходящие потоки.

Ключи составного типа. Есть ли способ сделать некоторые преобразования значений ключей и сохранить KEYS в нисходящих потоках?

Я пробовал несколько способов и знаю, что при использовании оператора SELECT AS во время оператора CREATE STREAM ключ KEY останется неизменным. но любые преобразования удаляют KEY из потоков.

Пример:

-- source stream
CREATE STREAM TEST_1 
  (COL1 STRING, COL2 STRING, COL3 STRING) 
  WITH (KAFKA_TOPIC='TEST_1', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO');

-- Second stream 
CREATE STREAM TEST_2 
   WITH (KAFKA_TOPIC='TEST_2', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='AVRO') AS 
   SELECT CLEAR(COL1) AS COL1, CLEAR(COL2) AS COL2, CLEAR(COL3) AS COL3 
     FROM TEST_1;

Здесь TEST_1 взят из базы данных. Он имеет составные ключи (COL1, COL2).

Но когда я создаю поток TEST_2, составные ключи не отражаются в TEST_1. Мне нужны ключи для записей надгробий для операции удаления в MySQL. С клавишами производятся некоторые операции по чистке.

Я использую для этого конфлюэнт Кафки. Операции вставки и обновления не вызывают никаких проблем, поскольку у меня было значение SMT для ключа в соединителях приемника.

Здесь CLEAR () - это настраиваемый UDF. Есть ли способ добиться этого?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...