У меня есть приложение весенних облачных потоков kafka, которое перехватывает входящие данные, чтобы иметь возможность объединять две темы: ключи выбора, значения карт и агрегировать данные. Со временем потребительские лаги увеличиваются, и масштабирование за счет добавления нескольких экземпляров приложения мало помогает. С каждым разом потребительское отставание, похоже, увеличивается.
Я увеличил и уменьшил количество экземпляров с 1 до 18, но большой разницы не заметил. Количество сообщений, от которых оно отстает, продолжает увеличиваться каждые 5 секунд независимо от количества экземпляров
KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
.flatMap(flattenOriginalData())
.through("atl-mapped-original-sensor-data-repartition", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));
//#2. Save modelid and algorithm parts of the key of the errorscore topic and reduce the key
// to installationId:assetId:tagName
//Repartition ahead of time avoiding multiple repartition topics and thereby duplicating data
KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
.map(enrichWithModelAndAlgorithmAndReduceKey())
.through("atl-mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));
return enrichedErrorData
//#3. Join
.join(flattenedOriginalData, join(),
JoinWindows.of(
// allow messages within one second to be joined together based on their timestamp
Duration.ofMillis(1000).toMillis())
// configure the retention period of the local state store involved in this join
.until(Long.parseLong(retention)),
Joined.with(
Serdes.String(),
new MappedErrorScoreDataSerde(),
new MappedOriginalSensorDataSerde()))
//#4. Set instalation:assetid:modelinstance:algorithm::tag key back
.selectKey((k,v) -> v.getOriginalKey())
//#5. Map to ErrorScore (basically removing the originalKey field)
.mapValues(removeOriginalKeyField())
.through("atl-joined-data-repartition");
затем часть агрегации:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(localStore.getStoreName());
// Set retention of changelog topic
materialized.withLoggingEnabled(topicConfig);
// Configure how windows looks like and how long data will be retained in local stores
TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
localStore.getTimeUnit(), Long.parseLong(topicConfig.get(RETENTION_MS)));
// Processing description:
// 2. With the groupByKey we group the data on the new key
// 3. With windowedBy we split up the data in time intervals depending on the provided LocalStore enum
// 4. With reduce we determine the maximum value in the time window
// 5. Materialized will make it stored in a table
stream.groupByKey()
.windowedBy(configuredTimeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue), materialized);
}
private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long retentionMs) {
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
return timeWindows;
}
Я ожидаю, что увеличение количества экземпляров значительно уменьшит отставание потребителей.
Так что в этой настройке участвуют несколько тем, таких как:
* оригинальные данные датчика
* оценка ошибки
* kstream-joinother
* kstream-jointhis
* atl-mapped-original-sensor-data-repartition
* atl-mapped-error-score-data-repartition
* atl-join-data-repartition
Идея состоит в том, чтобы объединить исходные данные датчика с оценкой ошибки. Для перебора требуется тема atl-mapped- *. затем объединение будет использовать темы kstream *, и в результате в результате объединения заполняется atl-join-data-repartition. После этого агрегация также создает темы, но я оставляю это вне области видимости.
original-sensor-data
\
\
\ atl-mapped-original-sensor-data-repartition-- kstream-jointhis -\
/ atl-mapped-error-score-data-repartition -- kstream-joinother -\
/ \
error-score atl-joined-data-repartition
Поскольку кажется, что увеличение количества экземпляров больше не оказывает большого влияния, так как я представил темы объединения и сопоставления atl, мне интересно, возможно ли, чтобы эта топология стала собственным узким местом? , Из потребительского лага кажется, что у темы "Исходные данные датчика" и "Оценка ошибок" гораздо меньше потребительского лага по сравнению, например, с темами atl-mapped- *. Есть ли способ справиться с этим, удалив эти изменения, или это приводит к невозможности масштабирования?