Невозможно выполнить запрос Ignite SQL через кеш [CustomKey, CustomValue] в Scala. - PullRequest
0 голосов
/ 26 апреля 2018

Я пытаюсь настроить распределенный кеш, используя Apache Ignite с Scala. После настройки кэша я могу помещать и получать элементы, зная ключ, но запросы SQL любого типа всегда возвращают курсор с нулевым итератором.

Вот как я настраиваю свой кеш (обратите внимание, что это делается до запуска ignition.start):

def setupTelemetryCache(): CacheConfiguration[TelemetryKey, TelemetryValue] = {

val dataRegionName = "persistent-region"
val cacheName = "telemetry-cache"

// This object is required to perform SQL queries over custom key object
val queryEntity = new QueryEntity("TelemetryKey", "TelemetryValue")
val fields: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]
fields.put("deviceId", classOf[String].getName)
fields.put("metricName", classOf[String].getName)
fields.put("timestamp", classOf[String].getName)
queryEntity.setFields(fields)
val keyFields: util.HashSet[String] = new util.HashSet[String]()
keyFields.add("deviceId")
keyFields.add("metricName")
keyFields.add("timestamp")
queryEntity.setKeyFields(keyFields)
queryEntity.setIndexes(Collections.emptyList[QueryIndex]())

new CacheConfiguration()
  .setName(cacheName)
  .setDataRegionName(dataRegionName)
  .setCacheMode(CacheMode.PARTITIONED) // Data is split among nodes
  .setBackups(1) // each partition has 1 backup
  .setIndexedTypes(classOf[String], classOf[TelemetryKey])  // Index by ID
  .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC) // Faster, clients do not wait for cache
  // synchronization. Consistency issues?
  .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) // Allows transactional query
  .setQueryEntities(Collections.singletonList(queryEntity)) 
}

А это код моего TelemetryKey:

case class TelemetryKey private (
                              @(AffinityKeyMapped @field)
                              @(QuerySqlField@field)(index = true)
                              deviceId: String,
                              @(QuerySqlField@field)(index = false)
                              metricName: String,
                              @(QuerySqlField@field)(index = true)
                              timestamp: String) extends Serializable 

И TelemetryValue:

class TelemetryValue private(valueType: ValueTypes.Value, doubleValue: Option[Double],
                             stringValue: Option[String],
                             longValue: Option[Long]) extends Serializable

Примером запроса SQL, который мне нужно выполнить, может быть «Выбрать * из CACHE, где deviceId = 'dev1234'», и я ожидаю получить все Cache.Entry [TelemetryKey, TelemetryValue] того же идентификатора устройства.

Вот как я выполняю запрос:

 private def sqlQuery(query: SqlQuery[TelemetryKey, TelemetryValue]):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    cache.query(query)
  }

  def getEntries(ofDeviceId: String):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryKey], "deviceId = ?")
    sqlQuery(q.setArgs(ofDeviceId))
  }

Даже при изменении тела запроса я получаю объект-курсор, который является пустым. Я даже не могу выполнить запрос «Выбрать *».

Спасибо за помощь

Ответы [ 2 ]

0 голосов
/ 26 апреля 2018

Существует два способа настройки индексов и запрашиваемых полей.

  1. Конфигурация на основе аннотаций
    Ваши классы ключей и значений должны быть аннотированы @QuerySqlField следующим образом.

    case class TelemetryKey private (
    @(AffinityKeyMapped @field)
    @(QuerySqlField@field)(index = true)
    deviceId: String,
    @(QuerySqlField@field)(index = false)
    metricName: String,
    @(QuerySqlField@field)(index = true)
    timestamp: String) extends Serializable

После определения индексированных и запрашиваемых полей они должны быть зарегистрированы в движке SQL вместе с типами объектов, к которым они принадлежат.


    new CacheConfiguration()
    .setName(cacheName)
    .setDataRegionName(dataRegionName)
    .setCacheMode(CacheMode.PARTITIONED)
    .setBackups(1)
    .setIndexedTypes(classOf[TelemetryKey], classOf[TelemetryValue])
    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)

UPD: Еще одна вещь, которая также должна быть исправлена ​​- это SqlQuery

  def getEntries(ofDeviceId: String):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryValue], "deviceId = ?")
    sqlQuery(q.setArgs(ofDeviceId))
  }
  1. Подход, основанный на QueryEntity
val queryEntity = new QueryEntity(classOf[TelemetryKey], classOf[TelemetryValue]);

    new CacheConfiguration()
    .setName(cacheName)
    .setDataRegionName(dataRegionName)
    .setCacheMode(CacheMode.PARTITIONED)
    .setBackups(1)
    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
    .setQueryEntities(Collections.singletonList(queryEntity))
0 голосов
/ 26 апреля 2018

Короче говоря, вы должны предоставить полные имена классов JVM для QueryEntity.

Как в:это не декоративно - должно быть точное совпадение.

Еще лучше?Используйте setIndexedTypes() вместо setQueryEntities().Он позволяет вам передавать классы вместо строк и сканировать аннотации, которые у вас уже есть.

...