Kafka Stream: материализация KTable - PullRequest
0 голосов
/ 21 мая 2018

Как определить, когда материализация KTable в теме завершена?

Например, предположим, что KTable содержит несколько миллионов строк.Псевдокод ниже:

KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows

В какой-то момент времени я хотел запланировать поток для вызова следующего, который пишет в тему: kt.toStream (). To ("output_topic_name");

Я хотел убедиться, что все данные записаны как часть вышеуказанного вызова.Кроме того, после вызова вышеуказанного метода «to» можно ли его вызывать в следующем расписании ИЛИ будет ли первый вызов всегда оставаться активным?

Дополнительный вопрос:

Ограничения
1) Хорошо, я вижу, что kstream и ktable не ограничены / бесконечны после запуска kafkastream.Однако, материализация ktable (в сжатую тему) не будет отправлять несколько записей для одного и того же ключа в течение указанного периода.

Таким образом, если процесс уплотнения не попытается очистить их и сохранить только самый последний, нисходящее приложение будет использовать все доступные записи для одного и того же ключа, запрашивая из темы, вызывая дубликаты.Даже если процесс сжатия выполняет некоторый уровень очистки, всегда невозможно, чтобы в определенный момент времени были некоторые ключи, которые имеют более одной записи, поскольку процесс сжатия догоняет.

Я полагаюKTable будет иметь только одну запись для данного ключа в RocksDB.Если у нас есть способ запланировать материализацию, это поможет избежать дубликатов.Кроме того, уменьшите объем данных, сохраняемых в теме (увеличив объем хранилища), увеличьте сетевой трафик, добавьте дополнительные ресурсы для процесса сжатия, чтобы очистить его.

2) Возможно, ReadOnlyKeyValueStore позволит осуществлять управляемый поискиз магазина, но у него все еще нет способа запланировать поиск ключа, значения и запись в тему, что требует дополнительного кодирования.

Можно ли улучшить API, чтобы обеспечить контролируемую материализацию?

1 Ответ

0 голосов
/ 21 мая 2018

Материализация KTable никогда не заканчивается, и вы также не можете "вызывать" to().

Когда вы используете API Streams, вы "соединяете" DAG операторов.Фактические вызовы методов, не запускают никаких вычислений, но изменяют DAG операторов.

Только после того, как вы начнете вычисление через KafkaStreams#start(), данные обрабатываются.Обратите внимание, что все указанные вами операторы будут выполняться непрерывно и одновременно после начала вычислений.

Не существует «конца вычислений», потому что ожидается, что ввод будет неограниченным / бесконечным, так как вышестоящее приложение может записывать новые данные в разделы ввода в любое время.Таким образом, ваша программа никогда не завершается сама собой.При необходимости вы можете остановить вычисление с помощью KafkaStreams#close().

Во время выполнения вы не можете изменить DAG.Если вы хотите изменить его, вам нужно остановить вычисление и создать новый экземпляр KafkaStreams, который будет принимать измененную группу DAG в качестве входных данных

Последующие действия:

Да,Вы должны думать о KTable как о «версионной таблице», которая развивалась с течением времени при обновлении записей.Таким образом, все обновления записываются в раздел журнала изменений и отправляются в нисходящем направлении в виде записей изменений (обратите внимание, что KTables также выполняют некоторое кэширование для «дедупликации» последовательных обновлений одного и того же ключа: см. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html).

будет использовать все доступные записи для одного и того же ключа, запрашивающего из темы, что приведет к дублированию.

Я бы не рассматривал их как "дубликаты", а как updates .И да, приложение должно уметь правильно обрабатывать эти обновления.

, если у нас есть способ запланировать материализацию, это поможет избежать дубликатов.

Материализация - это непрерывный процесс, и таблица KTable обновляется всякий раз, когда новые входные записи доступны в теме ввода и обрабатываются. Таким образом, в любой момент времени может быть обновление для определенного ключа. Таким образом, даже если у вас есть полный контроль наддля отправки обновлений в раздел журнала изменений и / или в нисходящий поток позже может появиться новое обновление. Такова природа потоковой обработки..

Кроме того, уменьшите объем данных, сохраняемых в теме (увеличив объем хранилища), увеличьте сетевой трафик, добавьте дополнительные затраты на процесс сжатия, чтобы очистить его.

Как уже упоминалось выше, для экономии ресурсов используется кэширование.

Можно ли улучшить API, чтобы разрешить контролируемую материализацию?

Если предоставляемая семантика KTable не делаетЧтобы удовлетворить ваши требования, вы всегда можете написать собственный оператор в виде Processor или Transformer, присоединить к нему хранилище значений ключей и реализовать все, что вам нужно.

...