У меня проблема с KSQL: потеря данных во время потока обновлений (завершение потока запросов и отбрасывания, создание нового потока) и публикация данных в «MainTopic».
Моя архитектура KSQL:
MAIN_STREAM ----> CONDITION_STREAM
Я потерял данные во время запроса завершения и отбрасывания потока до создания нового CONDITION_STREAM.
Может предложить способ обновления нового CONDITION_STREAM при публикации данных в MainTopic и продолжении потребленияданные во время завершения запроса и отбрасывания потока.
Прости мои навыки английского языка. Спасибо.
Я пытаюсь использовать 'auto.offset.reset' = 'самое раннее' для CONDITION_STREAM, но при этом используются все данные в MainTopic из MAIN_STREAM.
ШАГ 1: создать MAIN_STREAM из основной темыи поток условий для темы вывода
CREATE STREAM MAIN_STREAM (event_id VARCHAR , payload VARCHAR) WITH (KAFKA_TOPIC='MainTopic', VALUE_FORMAT='json');
ШАГ 2: создать данные фильтра CONDITION_STREAM из MAIN_STREAM
CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from MAIN_STREAM WHERE event = "payment";
ШАГ 3: завершить идентификатор запроса CONDITION_STREAM
TERMINATE <CONDITION_STREAM_QUERY_ID>;
ШАГ 4: создать новый CONDITION_STREAM
DROP STREAM CONDITION_STREAM;
ШАГ 5: создать новый CONDITION_STREAM
CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from main_stream WHERE event = "something " AND EXTRACTJSONFIELD(payload, '$.status') = 'init';