KStream присоединиться к Retry / Delayed KStream - PullRequest
1 голос
/ 08 мая 2019

Мы пытаемся реализовать вариант использования, который описан ниже, у нас есть проблемы с реализацией, которые мы стремимся преодолеть,

Вариант использования,

Мы пытаемся соединить KStream между двумя темами Kafka, сопоставляя KEY, присутствующий в сообщениях (JSON) обоих потоков. Также мы должны поддерживать последовательность сообщений в том виде, как она поступила в KStream из источника.

Сценарий есть, Если соответствующий ключ еще не поступил в какой-либо поток, мы должны прекратить или повторить соединение, пока ожидаемый ключ не появится в другой теме. Мы думали поместить непревзойденные записи обратно в KStream, но в этом случае последовательность не гарантируется.

Выпуск 1: Как остановить или удержать присоединение, пока ожидаемый ключ не появится в другой теме. Например, у KTable есть ключ 100, но KStream еще не получил ключ 100, тогда мы должны повторить попытку присоединиться или удерживать KStream, пока не придет ключ 100.

выпуск 2: Есть ли способ поместить Delay или Interval в KStream (Delayed KStream) для получения сообщений с задержкой по времени или интервалу.

Дополнительно мы должны построить Keyed KStream из неключевой темы (ключ будет установлен путем извлечения его из сообщения - JSON)

Java предпочтительнее, поскольку мы сделали POC для соединения KTable и KStream

KTable<String, String> leftStream = builder.table("stream1");
KStream<String, String> rightStream = builder.stream("stream2");
KStream<String, String> outstream = rightStream.leftJoin(leftStream, (orig_msg, description) -> {
         String new_msg = "";
            if (description != null) {
                  new_msg = orig_msg+"-->Matched--"+description;
            }else {
                new_msg = orig_msg+"-->UnMatched<--"+description;
            }
                return new_msg;
     });

Ответы [ 2 ]

0 голосов
/ 13 мая 2019

Спасибо, Micheal, когда я пробовал выше Stream Example, я сразу же получил проблему и KStream завершил работу ... (Вся тема, которую я создал с одним разделом) .. Кроме того, я должен глубоко погрузиться во внутренности KStream :-) ..

stream-client [custom-join -gration-test-4af19e3b-8773-4e75-814e-56ea37839a59] Переход состояния из состояния REBALANCING в PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams)

0 голосов
/ 09 мая 2019

В вашем примере вы выполняете левостороннее соединение KStream-to-KTable. Семантика соединения Kafka Streams указывает, что (a) только данные, поступающие в KStream, будут инициировать вывод соединения, и (b) если при получении нового события KStream в KTable нет соответствующих данных (справа) на стороне соединения), тогда все равно будет производиться немедленный вывод соединения, но с null для данных на стороне таблицы (т. е. не будет ждать поступления данных на стороне KTable).

Проблема 1: Как остановить или удержать соединение, пока ожидаемый ключ не появится в другой теме Например, KTable имеет ключ 100, но KStream еще не получил ключ 100, тогда мы должны повторить попытку присоединиться или удерживать KStream до получения ключа 100.

Во-первых, вы не можете остановить или удержать объединение с помощью встроенной функциональности Kafka Streams.

Во-вторых, конкретный пример, который вы предоставили, не будет реализован на практике, потому что (см. Выше) событие, прибывающее в таблицу KTable, не выдаст выход соединения. Только когда поступит событие в KStream, будет (a) выполнен поиск в KTable и (b) произведен выход соединения, независимо от результата (a).

Но что может произойти в соединении KStream-KTable LEFT, так это то, что происходит противоположный пример: KStream имеет ключ 100, но KTable еще не получил ключ 100. Как с этим справиться? Смотри ниже.

Проблема 2: есть ли способ поместить задержку или интервал в KStream (отложенный KStream) для получения сообщений с задержкой по времени или интервалу.

Да, есть способы сделать это. Но не с существующими операциями соединения в DSL Kafka Streams.

Вместо этого вы можете использовать API процессора Kafka Streams для реализации необходимой вам семантики соединения с помощью нескольких строк кода, а затем подключить эту функциональность к DSL для простого повторного использования.

Существует пример приложения, которое демонстрирует это, по совпадению для варианта использования, подобного вашему, приведенному выше: см. CustomStreamTableJoin at https://github.com/confluentinc/kafka-streams-examples ( прямая ссылка на пример CustomStreamTableJoin для Confluent v5. 2.1 / Apache Kafka 2.2).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...