Данные в таблице ksql не являются постоянными - PullRequest
0 голосов
/ 06 декабря 2018

Мы используем confluent платформу на Ubuntu.У нас есть простые данные JSON, отправляемые через запрос cURL на сервер kafka-rest по теме kafka с именем «UE_Context».

Для этой темы создается поток kafka с именем "UE_CONTEXT_STREAM" с помощью следующей команды:

CREATE STREAM UE_Context_Stream (ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', VALUE_FORMAT='JSON');

Для этой темы создается таблица kafka с именем "UE_CONTEXT_TABLE" с помощью следующей команды:

CREATE TABLE UE_Context_Table ( registertime BIGINT, ue_key VARCHAR, ecgi VARCHAR) WITH (KAFKA_TOPIC='UE_Context', KEY='ue_key', VALUE_FORMAT='JSON');

У меня есть две строки данных, перекачиваемых по теме с использованием следующих команд cURL:

curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x1234", "ecgi" : "1234"}}]}' "http://localhost:8082/topics/UE_Context"  
curl -X POST -H "Accept: application/json" -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records":[{"key": "0x1234", "value":{"ue_key": "0x4321", "ecgi" : "4321"}}]}' "http://localhost:8082/topics/UE_Context"      

У меня есть запрос на выборку, ожидающий таблицы, как показано ниже:

ksql query

Этот запрос отображает информацию таблицы, когда данные JSON закачиваются в тему.Затем мы прекращаем закачивать данные JSON в тему, завершаем запрос выбора и завершаем запрос выбора.Если выбор выполняется позднее, информация о ранее заполненной таблице не отображается.Нет ли способа сохранить эти данные?Разъемы Kafka и использование БД могут быть вариантом.Но разве у kSQL нет временной памяти для хранения информации таблицы?

1 Ответ

0 голосов
/ 07 декабря 2018

выбор выполняется позднее, информация о ранее заполненной таблице не отображается.

В операторе выбора по умолчанию используются последние смещения темы

Если вы хотите просмотреть предыдущие данные, вам нужно установить смещение потребителя обратно в начало

SET 'auto.offset.reset'='earliest';

Также, как указано в документации (с акцентом)

Сам по себе оператор SELECT является непостоянным непрерывным запросом.Результат оператора SELECT не сохраняется в теме Kafka и печатается только в консоли KSQL.Не путайте постоянные запросы, созданные CREATE STREAM AS SELECT, с результатом потокового запроса из оператора SELECT.

...