Я получаю события от Kafka
и сохраняю в Cassandra
. Синтаксический анализ json
, который содержит поля eventID, sessionID, timestamp, userID
для создания столбцов для таблицы Cassandra
, которая выглядит следующим образом:
cassandra@cqlsh> CREATE TABLE mydata.events (
... "event_date" date,
... "eventID" text,
... "userID" text,
... timestamp timeuuid,
... "sessionID" text,
... "fullJson" text,
... PRIMARY KEY ("event_date", timestamp, "sessionID")
и в коде:
case class cassandraFormat(
eventID: String,
sessionID: String,
timeuuid: UUID, // timestamp as timeuuid
userID: String,
event_date: LocalDate, // YYYY-MM-dd format
fullJson: String // full json from Kafka
)
Мне нужно добавить столбец timestamp
как timeuuid
. Поскольку я анализирую с json
, извлек все значения из заголовка и создал столбцы следующим образом:
val allJson = rdd.
map(x => {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
//use serialization default to format a Map to JSON
(x, Serialization.write(x))
}).
filter(x => x._1 isDefinedAt "header").
map(x => (x._1("header"), x._2)).
filter(x => (x._1 isDefinedAt "userID") &&
(x._1 isDefinedAt "eventID") &&
(x._1 isDefinedAt "sessionID") &&
(x._1 isDefinedAt "timestamp").
map(x => cassFormat(x._1("eventID").toString,
x._1("sessionID").toString,
com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
x._1("userID").toString,
com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
x._2))
Эта часть:
com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)
генерирует ошибку
java.lang.NumberFormatException: для входной строки:
«2019-05-09T09: 00: 52.553 + 0000» в
java.lang.NumberFormatException.forInputString (NumberFormatException.java:65)
Даже попробовал:
java.util.UUID.fromString(x._1("timestamp").toString
,
также генерирует ту же ошибку.
Как правильно преобразовать / преобразовать timestamp
в timeuuid
и вставить в Cassandra
через искровое задание