У меня есть две темы, к которым я хотел бы присоединиться, а затем запросить объединение для получения последних результатов. Я следил за документами по Create a ksqlDB Table from a ksqlDB Stream
из здесь .
Вот что я делаю:
CREATE TABLE CATALOGUE_TABLE
(title STRING)
WITH (KAFKA_TOPIC='catalogue-topic-test',
VALUE_FORMAT='AVRO');
CREATE TABLE SCHEDULE_TABLE
(fromInstant STRING,
toInstant STRING)
WITH (KAFKA_TOPIC='schedule-topic-test',
VALUE_FORMAT='AVRO');
будьте уверены, что обе основные темы есть ключи для всех их записей. Затем я присоединяюсь к ним вот так:
CREATE TABLE MYTABLE AS
SELECT c.title, s.fromInstant, s.toInstant
FROM CATALOGUE_TABLE c
INNER JOIN SCHEDULE_TABLE s
ON s.ROWKEY = c.ROWKEY
EMIT CHANGES;
Я не уверен, что у меня получится. что бы это ни было, я могу запустить следующее:
select * from MYTABLE EMIT CHANGES;
и увидеть все обновления на нем со всеми дубликатами. это в основном поток. Теперь, если я выполню следующее:
select * from MYTABLE WHERE ROWKEY='12';
, чтобы получить последнее обновление с id = 12, я получу:
Таблица 'MYTABLE' не материализована. Обратитесь к https://cnfl.io/queries для получения информации о типах запросов. Если вы ...
, а остальная часть вывода обрезана, поэтому я не вижу, что он пытается сказать. Я предполагаю, что я как-то делаю что-то не так в MYTABLE.
Я думаю, что мне не хватает groupBy, который должен был бы быть ответственным за избавление от всех записей с повторяющимися идентификаторами, но я не могу понять, что мне нужно поместить туда и нужно ли мне это делать только на уровне MYTABLE, или если это должно быть сделано на всех трех таблицах.