Я пытаюсь настроить распределенный кеш, используя Apache Ignite с Scala.
После настройки кэша я могу помещать и получать элементы, зная ключ, но запросы SQL любого типа всегда возвращают курсор с нулевым итератором.
Вот как я настраиваю свой кеш (обратите внимание, что это делается до запуска ignition.start):
def setupTelemetryCache(): CacheConfiguration[TelemetryKey, TelemetryValue] = {
val dataRegionName = "persistent-region"
val cacheName = "telemetry-cache"
// This object is required to perform SQL queries over custom key object
val queryEntity = new QueryEntity("TelemetryKey", "TelemetryValue")
val fields: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]
fields.put("deviceId", classOf[String].getName)
fields.put("metricName", classOf[String].getName)
fields.put("timestamp", classOf[String].getName)
queryEntity.setFields(fields)
val keyFields: util.HashSet[String] = new util.HashSet[String]()
keyFields.add("deviceId")
keyFields.add("metricName")
keyFields.add("timestamp")
queryEntity.setKeyFields(keyFields)
queryEntity.setIndexes(Collections.emptyList[QueryIndex]())
new CacheConfiguration()
.setName(cacheName)
.setDataRegionName(dataRegionName)
.setCacheMode(CacheMode.PARTITIONED) // Data is split among nodes
.setBackups(1) // each partition has 1 backup
.setIndexedTypes(classOf[String], classOf[TelemetryKey]) // Index by ID
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC) // Faster, clients do not wait for cache
// synchronization. Consistency issues?
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) // Allows transactional query
.setQueryEntities(Collections.singletonList(queryEntity))
}
А это код моего TelemetryKey:
case class TelemetryKey private (
@(AffinityKeyMapped @field)
@(QuerySqlField@field)(index = true)
deviceId: String,
@(QuerySqlField@field)(index = false)
metricName: String,
@(QuerySqlField@field)(index = true)
timestamp: String) extends Serializable
И TelemetryValue:
class TelemetryValue private(valueType: ValueTypes.Value, doubleValue: Option[Double],
stringValue: Option[String],
longValue: Option[Long]) extends Serializable
Примером запроса SQL, который мне нужно выполнить, может быть «Выбрать * из CACHE, где deviceId = 'dev1234'», и я ожидаю получить все Cache.Entry [TelemetryKey, TelemetryValue] того же идентификатора устройства.
Вот как я выполняю запрос:
private def sqlQuery(query: SqlQuery[TelemetryKey, TelemetryValue]):
QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
cache.query(query)
}
def getEntries(ofDeviceId: String):
QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryKey], "deviceId = ?")
sqlQuery(q.setArgs(ofDeviceId))
}
Даже при изменении тела запроса я получаю объект-курсор, который является пустым. Я даже не могу выполнить запрос «Выбрать *».
Спасибо за помощь