В вашем примере вы выполняете левостороннее соединение KStream-to-KTable. Семантика соединения Kafka Streams указывает, что (a) только данные, поступающие в KStream, будут инициировать вывод соединения, и (b) если при получении нового события KStream в KTable нет соответствующих данных (справа) на стороне соединения), тогда все равно будет производиться немедленный вывод соединения, но с null
для данных на стороне таблицы (т. е. не будет ждать поступления данных на стороне KTable).
Проблема 1: Как остановить или удержать соединение, пока ожидаемый ключ не появится в другой теме Например, KTable имеет ключ 100, но KStream еще не получил ключ 100, тогда мы должны повторить попытку присоединиться или удерживать KStream до получения ключа 100.
Во-первых, вы не можете остановить или удержать объединение с помощью встроенной функциональности Kafka Streams.
Во-вторых, конкретный пример, который вы предоставили, не будет реализован на практике, потому что (см. Выше) событие, прибывающее в таблицу KTable, не выдаст выход соединения. Только когда поступит событие в KStream, будет (a) выполнен поиск в KTable и (b) произведен выход соединения, независимо от результата (a).
Но что может произойти в соединении KStream-KTable LEFT, так это то, что происходит противоположный пример: KStream имеет ключ 100, но KTable еще не получил ключ 100. Как с этим справиться? Смотри ниже.
Проблема 2: есть ли способ поместить задержку или интервал в KStream (отложенный KStream) для получения сообщений с задержкой по времени или интервалу.
Да, есть способы сделать это. Но не с существующими операциями соединения в DSL Kafka Streams.
Вместо этого вы можете использовать API процессора Kafka Streams для реализации необходимой вам семантики соединения с помощью нескольких строк кода, а затем подключить эту функциональность к DSL для простого повторного использования.
Существует пример приложения, которое демонстрирует это, по совпадению для варианта использования, подобного вашему, приведенному выше: см. CustomStreamTableJoin
at https://github.com/confluentinc/kafka-streams-examples ( прямая ссылка на пример CustomStreamTableJoin для Confluent v5. 2.1 / Apache Kafka 2.2).