Автоинкремент столбца в ksqlDB? - PullRequest
0 голосов
/ 17 июня 2020

В настоящее время я использую этот процесс (см. Ниже) для создания автоматически увеличивающегося столбца в ksqlDB. Но теперь мне интересно, есть ли при таком подходе условия гонки или другие проблемы с синхронизацией. Это хороший способ создать столбец с автоинкрементом в ksqlDB? Если нет, есть ли лучший способ? создайте два потока:

CREATE STREAM dest (ROWKEY INT KEY, i INT, x INT) WITH (kafka_topic='test_dest', value_format='json', partitions=1);
CREATE STREAM src (x INT) WITH (kafka_topic='test_src', value_format='json', partitions=1);

Затем создайте материализованное представление, которое будет содержать максимальное значение потока назначения.

CREATE TABLE dest_maxi AS SELECT MAX(i) AS i FROM dest GROUP BY 1;

Нам нужно иметь возможность присоединиться к исходному потоку к материализованному представлению. Для этого мы создадим еще один промежуточный поток с фиктивным столбцом one, который всегда установлен на 1, что мы и сгруппировали в материализованном представлении:

CREATE STREAM src_one AS SELECT x, 1 AS one FROM src;
INSERT INTO dest SELECT COALESCE(dest_maxi.i,0)+1 AS i, src_one.x AS x FROM src_one LEFT JOIN dest_maxi ON src_one.one = dest_maxi.ROWKEY PARTITION BY COALESCE(dest_maxi.i,0)+1 EMIT CHANGES;

Теперь вы можете вставлять значения в поток src и наблюдайте, как они появляются в потоке dest с автоматически увеличивающимися идентификаторами.

1 Ответ

0 голосов
/ 17 июня 2020

Не думаю, что ваш подход сработает. kslqDB не дает никаких гарантий в порядке обработки записей по двум разным запросам. В вашем случае это означает отсутствие гарантии того, что

CREATE TABLE dest_maxi AS <query>;

будет запускаться и обновлять dest_maxi перед запуском

INSERT INTO dest <query>;

. Следовательно, я думаю, вы столкнетесь с проблемами.

Похоже, вы пытаетесь взять поток чисел, например,

1234
24746
24848
4947
34

И добавить столбец с автоматически увеличивающимся идентификатором, чтобы результат должен выглядеть так:

1, 1234
2, 24746
3, 24848
4, 4947
5, 34

Что-то вроде этого должно дать вам то, что вы хотите:

-- source stream of numbers:
CREATE STREAM src (
     x INT
  ) WITH (
    kafka_topic='test_src', 
    value_format='json'
  );

-- intermediate 'table' of numbers and current count:
CREATE TABLE with_counter 
   WITH (partitions = 1) AS
   SELECT
      1 as k,
      LATEST_BY_OFFSET(x) as x,
      COUNT(1) AS id
   FROM src
   GROUP BY 1


-- if you need this back as a stream in ksqlDB you can run:
CREATE STREAM dest (
     x INT,
     id BIGINT
   ) WITH (
     kafka_topic='WITH_COUNTER',
     value_format='json'
   );

UDAF вычисляют значения для каждого ключа, поэтому мы группируем их по константе, обеспечивая все входные строки объединяются в один ключ (и разделение - так что это плохо масштабируется!).

Мы используем COUNT для подсчета количества видимых строк, поэтому его вывод автоматически увеличивается, и мы используем LATEST_BY_OFFSET, чтобы захватить текущее значение x в нашу таблицу.

Журнал изменений таблицы with_counter будет содержать желаемый результат, только с постоянным ключом 1:

1 -> 1, 1234
1 -> 2, 24746
1 -> 3, 24848
1 -> 4, 4947
1 -> 5, 34

Мы повторно импортируем это в ksqlDB как поток dest. Который вы можете использовать как обычно. Если вы хотите топи c без ключа, вы можете просто запустить:

CREATE STREAM without_key AS SELECT * FROM dest;
...