предположим, что я выполняю некоторую пользовательскую обработку при сохранении моих данных в Globalstore после извлечения данных из темы, т.е. я создаю пользовательский ключ из значения сообщения. Будет ли он восстанавливать Globalstore таким же образом после локального удаления состояния.
override def process(key: String, value: String): Unit = {
logger.info("telephonyUsersProcessorCounter = "+telephonyUsersProcessorCounter)
telephonyUsersProcessorCounter = telephonyUsersProcessorCounter +1
val telKey = processKey(key)
if (telKey.isDefined) {
val telValue = processValue(value)
if(telValue.isDefined ){
StreamConstants.teleStore.get.put(telKey.get,telValue.get)
val compositeKeyForNumber = telValue.get.enterpriseId + telValue.get.phoneNumber
val compositeKeyForDeviceName = telValue.get.enterpriseId +telValue.get.deviceName
val compositeKeyForNumberAndDeviceName = telValue.get.enterpriseId +telValue.get.phoneNumber+telValue.get.deviceName
val telCompositeKeyForNumber = StreamConstants.teleStore.get.get(compositeKeyForNumber)
val telCompositeKeyForDeviceName = StreamConstants.teleStore.get.get(compositeKeyForDeviceName)
val telCompositeKeyForNumberAndDeviceName = StreamConstants.teleStore.get.get(compositeKeyForNumberAndDeviceName)
if(null !=telCompositeKeyForNumber ){
if(telCompositeKeyForNumber.dateCreated.toLong < telValue.get.dateCreated.toLong){
StreamConstants.teleStore.get.put(compositeKeyForNumber,telValue.get)
}
}else {
StreamConstants.teleStore.get.put(compositeKeyForNumber,telValue.get)
}
if(null != telCompositeKeyForDeviceName){
if(telCompositeKeyForDeviceName.dateCreated.toLong < telValue.get.dateCreated.toLong){
StreamConstants.teleStore.get.put(compositeKeyForDeviceName,telValue.get)
}
}else {
StreamConstants.teleStore.get.put(compositeKeyForDeviceName,telValue.get)
}
if(null != telCompositeKeyForNumberAndDeviceName){
if(telCompositeKeyForNumberAndDeviceName.dateCreated.toLong < telValue.get.dateCreated.toLong){
StreamConstants.teleStore.get.put(compositeKeyForNumberAndDeviceName,telValue.get)
}
}else {
StreamConstants.teleStore.get.put(compositeKeyForNumberAndDeviceName,telValue.get)
}
context.forward(telKey.get, telValue.get.toJson.toString())
context.forward(compositeKeyForNumber, telValue.get.toJson.toString())
context.forward(compositeKeyForDeviceName, telValue.get.toJson.toString())
context.forward(compositeKeyForNumberAndDeviceName, telValue.get.toJson.toString())
}else {
StreamConstants.teleStore.get.put(telKey.get,null)
context.forward(telKey.get,null)
}
}
}
создание собственного ключа с использованием данных из значения сообщения вместо использования прямого ключа из темы.Предположим, я удалил свой локальный глобальный магазин.Что будет при восстановлении этого магазина из компактной темы?