Колонка метки времени Кассандры как timeuuid - PullRequest
0 голосов
/ 09 мая 2019

Я получаю события от Kafka и сохраняю в Cassandra. Синтаксический анализ json, который содержит поля eventID, sessionID, timestamp, userID для создания столбцов для таблицы Cassandra, которая выглядит следующим образом:

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

и в коде:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

Мне нужно добавить столбец timestamp как timeuuid. Поскольку я анализирую с json, извлек все значения из заголовка и создал столбцы следующим образом:

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

Эта часть:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

генерирует ошибку

java.lang.NumberFormatException: для входной строки: «2019-05-09T09: 00: 52.553 + 0000» в java.lang.NumberFormatException.forInputString (NumberFormatException.java:65)

Даже попробовал: java.util.UUID.fromString(x._1("timestamp").toString, также генерирует ту же ошибку. Как правильно преобразовать / преобразовать timestamp в timeuuid и вставить в Cassandra через искровое задание

Ответы [ 2 ]

0 голосов
/ 10 мая 2019

Мне удалось это сделать, преобразовав формат timestamp в dateTime и millis, затем сгенерировав uuid:

val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)

val allJson = rdd.
              map(x => {
                implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
                //use serialization default to format a Map to JSON
                (x, Serialization.write(x))
              }).
              filter(x => x._1 isDefinedAt "header").
              map(x => (x._1("header"), x._2)).
              filter(x => (x._1 isDefinedAt "userID") &&
                (x._1 isDefinedAt "eventID") &&
                (x._1 isDefinedAt "sessionID") &&
                (x._1 isDefinedAt "timestamp").
              map(x => {
                var millis: Long  = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
                try {
                  val dateStr: String = x._1("timestamp").asInstanceOf[String]
                  // timestamp from event json
                  // create DateTime from Timestamp string
                  val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
                  // create millis from DateTime
                  millis = dateTime.toInstant.toEpochMilli
                } catch {
                  case e: Exception =>
                    e.printStackTrace()
                }
                // generate timeuuid
                val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
                // generate eventDate
                val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
                cassFormat(x._1("eventID").toString,
                  x._1("sessionID").toString,
                  uuid,
                  x._1("userID").toString,
                  eventDate,
                  x._2)
              })
            allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
        }
      })

timestamp колонка в Кассандре теперь выглядит так: 58976340-7313-11e9-910d-60dce7513b94

0 голосов
/ 09 мая 2019

У вас есть строка, которая не является числом, и вы пытаетесь преобразовать ее в одну, используя toLong. таким образом, исключение.

Глядя на это , похоже, что вы можете получить UUID на основе некоторой метки времени, используя этот метод:

public static UUID getTimeUUID(long when)

Вам нужно будет проанализировать строку в DateTime или Instant, а затем передать миллисекунды этого DateTime / Instant в getTimeUUID

...