Мы используем весенний облачный поток с Kafka 2.0.1 и используем InteractiveQueryService для получения данных из хранилищ. Есть 4 хранилища, которые сохраняют данные на диске после объединения данных. Код для топологии выглядит следующим образом:
@Slf4j
@EnableBinding(SensorMeasurementBinding.class)
public class Consumer {
public static final String RETENTION_MS = "retention.ms";
public static final String CLEANUP_POLICY = "cleanup.policy";
@Value("${windowstore.retention.ms}")
private String retention;
/**
* Process the data flowing in from a Kafka topic. Aggregate the data to:
* - 2 minute
* - 15 minutes
* - one hour
* - 12 hours
*
* @param stream
*/
@StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
public void process(KStream<String, SensorMeasurement> stream) {
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(RETENTION_MS, retention);
topicConfig.put(CLEANUP_POLICY, "delete");
log.info("Changelog and local window store retention.ms: {} and cleanup.policy: {}",
topicConfig.get(RETENTION_MS),
topicConfig.get(CLEANUP_POLICY));
createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
}
private void createWindowStore(
LocalStore localStore,
Map<String, String> topicConfig,
KStream<String, SensorMeasurement> stream) {
// Configure how the statestore should be materialized using the provide storeName
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(localStore.getStoreName());
// Set retention of changelog topic
materialized.withLoggingEnabled(topicConfig);
// Configure how windows looks like and how long data will be retained in local stores
TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
localStore.getTimeUnit(), Long.parseLong(topicConfig.get(RETENTION_MS)));
// Processing description:
// The input data are 'samples' with key <installationId>:<assetId>:<modelInstanceId>:<algorithmName>
// 1. With the map we add the Tag to the key and we extract the error score from the data
// 2. With the groupByKey we group the data on the new key
// 3. With windowedBy we split up the data in time intervals depending on the provided LocalStore enum
// 4. With reduce we determine the maximum value in the time window
// 5. Materialized will make it stored in a table
stream
.map(getInstallationAssetModelAlgorithmTagKeyMapper())
.groupByKey()
.windowedBy(configuredTimeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue), materialized);
}
private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long retentionMs) {
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
return timeWindows;
}
/**
* Determine the max error score to keep by looking at the aggregated error signal and
* freshly consumed error signal
*
* @param aggValue
* @param newValue
* @return
*/
private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore newValue) {
if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
return aggValue;
}
return newValue;
}
private KeyValueMapper<String, SensorMeasurement,
KeyValue<? extends String, ? extends ErrorScore>> getInstallationAssetModelAlgorithmTagKeyMapper() {
return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + sensorMeasurement.getT(),
new ErrorScore(sensorMeasurement.getTs(), sensorMeasurement.getE(), sensorMeasurement.getO()));
}
}
Таким образом, мы материализуем агрегированные данные в четыре разных хранилища после определения максимального значения в конкретном окне для определенного ключа.
Обратите внимание, что срок хранения данных составляет два месяца, а политика очистки удаляется. Мы не сжимаем данные.
Размер отдельных хранилищ состояний на диске составляет от 14 до 20 ГБ данных.
Мы используем интерактивные запросы: https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#interactive-queries
В нашей настройке у нас есть 4 экземпляра нашего потокового приложения, которые будут использоваться как одна группа потребителей. Таким образом, каждый экземпляр будет хранить определенную часть всех данных в своем хранилище.
Кажется, все это работает хорошо. Пока мы не перезапустим один или несколько экземпляров и не дождемся, пока они снова станут доступными. Я ожидаю, что перезапуск приложения не займет много времени, но, к сожалению, это займет 1 час. Я предполагаю, что проблема вызвана количеством данных в комбинации восстановления государственных хранилищ, но я не уверен. Я ожидал бы, что, поскольку мы сохраняем данные хранилища состояний на постоянных томах за пределами контейнера, который выполняется на kubernetes, приложение получит последнее смещение от посредника и должно продолжать только с этой точки, так как ранее использованные данные уже существуют. в государственном магазине. К сожалению, я понятия не имею, как решить эту проблему.
Перезапуск нашего приложения запускает задачу восстановления:
-StreamThread-2] Restoring task 4_3's state store twelve-hours-error-score from beginning of the changelog anomaly-timeline-twelve-hours-error-score-changelog-3.
Этот процесс занимает довольно много времени. Почему он восстанавливается с самого начала и почему так долго? У меня есть auto.offset.reset, установленный на «самый ранний», но он используется только, когда смещение неизвестно, не так ли?
Вот мои настройки потока. Обратите внимание, что max.bytes.buffering установлен в 0. Я изменил это, но это не имело значения. Я также читал об ошибке с num.stream.threads, где> 1 вызывает проблемы, но также установка этого значения на 1 не улучшает скорость перезапуска.
2019-03-05 13:44:53,360 INFO main org.apache.kafka.common.config.AbstractConfig StreamsConfig values:
application.id = anomaly-timeline
application.server = localhost:5000
bootstrap.servers = [localhost:9095]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 0
client.id =
commit.interval.ms = 500
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class errorscore.raw.boundary.ErrorScoreTimestampExtractor
default.value.serde = class errorscore.raw.boundary.ErrorScoreSerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 2
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = ./state-store
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
Через некоторое время также регистрируются эти сообщения:
CleanupThread] Deleting obsolete state directory 1_1 for task 1_1 as 1188421ms has elapsed (cleanup delay is 600000ms).
Также кое-что следует отметить, я добавил следующий код, чтобы переопределить очистку по умолчанию при запуске и остановке, где удаляются хранилища по умолчанию:
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(false, false);
}
любая помощь будет оценена!