Kafka Streams: можно ли использовать метки времени для удаления, отличные от потоковой обработки? - PullRequest
0 голосов
/ 16 октября 2018

В Кафке Потоки 2.0.

Мой вариант использования: возможность (частичной) повторной обработки данных с созданным событием временем (определяемым пользователем из исходных данных и устанавливаемым через TimestampExtractor) с начала истории для приложения обработки, работающего параллельно с длиннойзапуск безостановочного приложения, отправка данных в выходные темы (оба приложения будут читать и отправлять в одни и те же выходные темы, которые используются для создания состояния).

Хранилища создаются из этих тем и включают управление окнами по сеансам.,Представьте себе, что я хотел бы дать один месяц хранения (для событий не по порядку и потребления) для этих тем - при повторной обработке, если я использую время события, я буду обрабатывать (и генерировать) старше, чем а-месячные события.

Использование message.timestamp.type=LogAppendTime в соответствии с KIP-32 , чтобы избежать удаления, приведет к созданию неправильных данных в хранилищах состояний (поскольку метки времени будут неправильными, и они будут использоваться, например, длясеанс).

Использование времени события, полное сохранение и применение данных очистки после завершения и использования повторной обработки утомительно, но поможет уменьшить размер тем - однако как насчет хранилищ, созданных из них?Например, чтобы удерживать данные во время повторной обработки, я должен был бы установить параметр until псевдо-бесконечности, но созданные DSL хранилища доступны (или должны быть) только для чтения, а не для манипулирования.

Итак, вернемся к названию:

  • Возможно ли (или предусмотрено) использовать разные метки времени для удаления, чем для потоковой обработки?
  • Есть ли другой способ лучше обойти это?

1 Ответ

0 голосов
/ 18 октября 2018

Для потоков, использование LogAppendTime для тем перераспределения является ошибочной конфигурацией.Также обратите внимание, что вы не потеряете данные в разделах перераспределения, потому что они создаются со временем хранения Integer.MAX_VALUE (ср. https://cwiki.apache.org/confluence/display/KAFKA/KIP-284%3A+Set+default+retention+ms+for+Streams+repartition+topics+to+Long.MAX_VALUE). Streams использует purgeData API для удаления данных из разделов перераспределения после их использования(см. https://issues.apache.org/jira/browse/KAFKA-6150), чтобы избежать неограниченного роста.

Таким образом, я бы рекомендовал перенастроить все темы перераспределения с помощью log.message.timestamp.type (т. е. конфигурация уровня темы).

...