Не думаю, что ваш подход сработает. kslqDB не дает никаких гарантий в порядке обработки записей по двум разным запросам. В вашем случае это означает отсутствие гарантии того, что
CREATE TABLE dest_maxi AS <query>;
будет запускаться и обновлять dest_maxi
перед запуском
INSERT INTO dest <query>;
. Следовательно, я думаю, вы столкнетесь с проблемами.
Похоже, вы пытаетесь взять поток чисел, например,
1234
24746
24848
4947
34
И добавить столбец с автоматически увеличивающимся идентификатором, чтобы результат должен выглядеть так:
1, 1234
2, 24746
3, 24848
4, 4947
5, 34
Что-то вроде этого должно дать вам то, что вы хотите:
-- source stream of numbers:
CREATE STREAM src (
x INT
) WITH (
kafka_topic='test_src',
value_format='json'
);
-- intermediate 'table' of numbers and current count:
CREATE TABLE with_counter
WITH (partitions = 1) AS
SELECT
1 as k,
LATEST_BY_OFFSET(x) as x,
COUNT(1) AS id
FROM src
GROUP BY 1
-- if you need this back as a stream in ksqlDB you can run:
CREATE STREAM dest (
x INT,
id BIGINT
) WITH (
kafka_topic='WITH_COUNTER',
value_format='json'
);
UDAF вычисляют значения для каждого ключа, поэтому мы группируем их по константе, обеспечивая все входные строки объединяются в один ключ (и разделение - так что это плохо масштабируется!).
Мы используем COUNT
для подсчета количества видимых строк, поэтому его вывод автоматически увеличивается, и мы используем LATEST_BY_OFFSET
, чтобы захватить текущее значение x
в нашу таблицу.
Журнал изменений таблицы with_counter
будет содержать желаемый результат, только с постоянным ключом 1
:
1 -> 1, 1234
1 -> 2, 24746
1 -> 3, 24848
1 -> 4, 4947
1 -> 5, 34
Мы повторно импортируем это в ksqlDB как поток dest
. Который вы можете использовать как обычно. Если вы хотите топи c без ключа, вы можете просто запустить:
CREATE STREAM without_key AS SELECT * FROM dest;