У меня есть поток объектов, где я хотел бы рассчитать среднее значение поля в этом объекте, а затем сохранить это среднее обратно на объект. Я бы хотел, чтобы у меня было 5 минут падающего окна с задержкой 1 час. Я новичок в Kafka, поэтому мне интересно, если это правильный способ решения проблемы.
Сначала я создаю постоянное хранилище:
StoreBuilder<WindowStore<String, Double>> averagesStoreSupplier =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofHours(1), Duration.ofMinutes(5), true),
Serdes.String(),
Serdes.Double());
streamsBuilder.addStateStore(averagesStoreSupplier);
Затем я вызываю мой трансформатор, используя:
otherKTable
.leftJoin(objectKTable.transformValues(new AveragingTransformerSupplier(WINDOW_STORE_NAME), WINDOW_STORE_NAME),
myValueJoiner)
.to("outputTopic")
А вот и мой трансформатор:
public class AveragingTransformerSupplier implements ValueTransformerWithKeySupplier<String, MyObject, MyObject> {
private final String stateStoreName;
public TelemetryAveragingTransformerSupplier(final String stateStoreName) {
this.stateStoreName = stateStoreName;
}
public ValueTransformerWithKey<String, MyObject, MyObject> get() {
return new ValueTransformerWithKey<>() {
private WindowStore<String, Double> averagesStore;
@Override
public void init(ProcessorContext processorContext) {
averagesStore = Try.of(() ->(WindowStore<String, Double>) processorContext.getStateStore(stateStoreName)).getOrElse((WindowStore<String, Double>)null);
}
@Override
public MyObject transform(String s, MyObject myObject) {
if (averagesStore != null) {
averagesStore.put(s, myObject.getNumber());
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now();
WindowStoreIterator<Double> itr = averagesStore.fetch(s, timeFrom, timeTo);
double sum = 0.0;
int size = 0;
while(itr.hasNext()) {
KeyValue<Long, Double> next = itr.next();
size++;
sum += next.value;
}
myObject.setNumber(sum / size);
}
return myObject;
}
@Override
public void close() {
if (averagesStore != null) {
averagesStore.flush();
}
}
};
}
}
У меня есть пара вопросов.
Во-первых, является ли способ, которым я определяю WindowStore, правильный способ формирования акробатического окна? Как бы я создал скачкообразное окно?
Во-вторых, внутри моего трансформера я получаю все предметы из магазина с начала времен до наших дней. Поскольку я определил его как 5-минутное окно и 1-часовое время хранения, означает ли это, что товары в магазине представляют собой снимок данных за 5 минут? Что здесь делает удержание?
У меня это работает над тривиальными случаями, но я не уверен, есть ли лучший способ сделать это с помощью агрегаций и объединений, или даже если я делаю это правильно. Кроме того, мне пришлось окружить извлечение получения хранилища попыткой catch, потому что инициализация вызывается несколько раз, и иногда я получаю Processor has no access to StateStore
исключение.