Материализация KTable никогда не заканчивается, и вы также не можете "вызывать" to()
.
Когда вы используете API Streams, вы "соединяете" DAG операторов.Фактические вызовы методов, не запускают никаких вычислений, но изменяют DAG операторов.
Только после того, как вы начнете вычисление через KafkaStreams#start()
, данные обрабатываются.Обратите внимание, что все указанные вами операторы будут выполняться непрерывно и одновременно после начала вычислений.
Не существует «конца вычислений», потому что ожидается, что ввод будет неограниченным / бесконечным, так как вышестоящее приложение может записывать новые данные в разделы ввода в любое время.Таким образом, ваша программа никогда не завершается сама собой.При необходимости вы можете остановить вычисление с помощью KafkaStreams#close()
.
Во время выполнения вы не можете изменить DAG.Если вы хотите изменить его, вам нужно остановить вычисление и создать новый экземпляр KafkaStreams
, который будет принимать измененную группу DAG в качестве входных данных
Последующие действия:
Да,Вы должны думать о KTable как о «версионной таблице», которая развивалась с течением времени при обновлении записей.Таким образом, все обновления записываются в раздел журнала изменений и отправляются в нисходящем направлении в виде записей изменений (обратите внимание, что KTables также выполняют некоторое кэширование для «дедупликации» последовательных обновлений одного и того же ключа: см. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html).
будет использовать все доступные записи для одного и того же ключа, запрашивающего из темы, что приведет к дублированию.
Я бы не рассматривал их как "дубликаты", а как updates .И да, приложение должно уметь правильно обрабатывать эти обновления.
, если у нас есть способ запланировать материализацию, это поможет избежать дубликатов.
Материализация - это непрерывный процесс, и таблица KTable обновляется всякий раз, когда новые входные записи доступны в теме ввода и обрабатываются. Таким образом, в любой момент времени может быть обновление для определенного ключа. Таким образом, даже если у вас есть полный контроль наддля отправки обновлений в раздел журнала изменений и / или в нисходящий поток позже может появиться новое обновление. Такова природа потоковой обработки..
Кроме того, уменьшите объем данных, сохраняемых в теме (увеличив объем хранилища), увеличьте сетевой трафик, добавьте дополнительные затраты на процесс сжатия, чтобы очистить его.
Как уже упоминалось выше, для экономии ресурсов используется кэширование.
Можно ли улучшить API, чтобы разрешить контролируемую материализацию?
Если предоставляемая семантика KTable не делаетЧтобы удовлетворить ваши требования, вы всегда можете написать собственный оператор в виде Processor
или Transformer
, присоединить к нему хранилище значений ключей и реализовать все, что вам нужно.