Удаление сообщений из потока после их присоединения к Kafka Stream - PullRequest
0 голосов
/ 28 января 2019

Я использую Kafka Streams для соединения по двум различным типам сообщений, приходящих из двух разных тем Kafka.Я использую Скользящее временное окно .Эта политика окна сохраняет информацию из потоков для количества типа, которое не зависит от того, присоединилось ли сообщение к чему-либо или нет.

В случае очень высокой пропускной способности входных потоков, темы, созданные KafkaВыполнение объединения может расти очень быстро, занимая огромное количество дискового пространства.

Есть ли возможность удалить сообщения из вышеуказанной темы после присоединения?Таким образом, я буду предполагать, что сообщение объединяется не более одного раза с другим сообщением с тем же ключом.

Большое спасибо.

Ответы [ 2 ]

0 голосов
/ 01 февраля 2019

Вы можете уменьшить время хранения с помощью параметра until():

stream1.join(stream2, JoinWindows.of(...).until(/*put retention time here*/);

Указанное время хранения будет использоваться как для локальных хранилищ, так и для основной темы журнала изменений.Обратите внимание: если тема журнала изменений уже существует, изменение until() приведет к , а не обновлению конфигурации темы - вам необходимо обновить конфигурацию темы вручную.

0 голосов
/ 30 января 2019

0.11.0.0 представляет новый API deleteRecords в AdminClient и скрипт с именем kafka-delete-records, который можно использовать для удаления всех записей до заданного смещения.Вы можете использовать их для очистки ненужных данных.

Подробнее см. KIP-107 .

...