KSQL - определение момента загрузки таблицы - PullRequest
0 голосов
/ 14 ноября 2018

Как определить, когда KSQL полностью загрузил мои данные из темы Kafka в мою таблицу?

ЦЕЛЬ: Возьмите 2 темы Kafka, объедините их и запишите результаты в новуюТема Kafka.

ПРИМЕР:

Я использую API отдыха Ksql для выполнения следующих команд.

CREATE TABLE MyTable (A1 VARCHAR, A2 VARCHAR) WITH (kafka_topic='topicA', key='A1', value_format='json');
CREATE STREAM MyStream (B1 varchar, B2 varchar) WITH (kafka_topic='topicB', value_format='json');
CREATE STREAM MyDestination WITH (Kafka_topic='topicC', PARTITIONS = 1, value_format='json') AS SELECT a.A1 as A1, a.A2 as A2, b.B1 as B1, b.B2 as B2 FROM  MyStream b left join MyTable a on a.A1 = b.B1;

ПРОБЛЕМА: topicC имеет только данные из topicB, и все объединенные значения являются нулевыми.

Несмотря на то, что я получаю статус SUCCESS от команды создания таблицы, похоже, что данные не полностью загружены в таблицу.Следовательно, результат 3-й команды имеет только данные из потока и не включает данные из таблицы.Если я искусственно задержу выполнение команды соединения, то в полученном разделе будут правильно данные из обоих разделов.Как я могу определить, когда моя таблица загружена, и безопасно ли выполнить команду соединения?

Ответы [ 2 ]

0 голосов
/ 29 ноября 2018

Таблицы в KSQL (и базовые потоки Kafka) имеют временное измерение, то есть эволюционируют во времени.Для соединения с таблицей потоков каждая запись потока объединяется с «правильной» версией таблицы (т. Е. Таблицы имеют версии по времени).

В следующей версии CP 5.1 вы можете «предварительно загрузить»таблицы, гарантируя, что все метки времени записи темы таблицы будут меньше, чем метки времени записи темы потока.Это говорит KSQL о том, что сначала необходимо обработать данные темы таблицы, но перед тем, как начать присоединение, передвигайте соответственно временную метку таблицы.

Для получения более подробной информации ознакомьтесь с: https://www.confluent.io/resources/streams-tables-two-sides-same-coin

0 голосов
/ 29 ноября 2018

Это действительно великий вопрос. На данный момент в KSQL нет способа автоматически выполнить объединение потоковой таблицы только после полной загрузки таблицы. Это действительно полезная функция. Более общая и связанная с этим проблема обсуждается здесь: https://github.com/confluentinc/ksql/issues/1751

...