Как KSQL продолжает потреблять данные при обновлении Stream? - PullRequest
1 голос

У меня проблема с KSQL: потеря данных во время потока обновлений (завершение потока запросов и отбрасывания, создание нового потока) и публикация данных в «MainTopic».

Моя архитектура KSQL:

MAIN_STREAM ----> CONDITION_STREAM

Я потерял данные во время запроса завершения и отбрасывания потока до создания нового CONDITION_STREAM.

Может предложить способ обновления нового CONDITION_STREAM при публикации данных в MainTopic и продолжении потребленияданные во время завершения запроса и отбрасывания потока.

Прости мои навыки английского языка. Спасибо.

Я пытаюсь использовать 'auto.offset.reset' = 'самое раннее' для CONDITION_STREAM, но при этом используются все данные в MainTopic из MAIN_STREAM.

ШАГ 1: создать MAIN_STREAM из основной темыи поток условий для темы вывода

CREATE STREAM MAIN_STREAM (event_id VARCHAR , payload VARCHAR) WITH (KAFKA_TOPIC='MainTopic', VALUE_FORMAT='json');

ШАГ 2: создать данные фильтра CONDITION_STREAM из MAIN_STREAM

CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from MAIN_STREAM WHERE  event = "payment";

ШАГ 3: завершить идентификатор запроса CONDITION_STREAM

TERMINATE <CONDITION_STREAM_QUERY_ID>;

ШАГ 4: создать новый CONDITION_STREAM

DROP STREAM CONDITION_STREAM;

ШАГ 5: создать новый CONDITION_STREAM

CREATE STREAM CONDITION_STREAM WITH (KAFKA_TOPIC='OutputTopic', VALUE_FORMAT='json') AS SELECT * from main_stream WHERE  event = "something " AND EXTRACTJSONFIELD(payload, '$.status') = 'init';
...