Spark scala Cassandra сохранить / обновить - PullRequest
0 голосов
/ 14 апреля 2020

У меня есть набор искровых данных объекта, который должен быть сохранен / обновлен в таблицу cassandra с именем 'offer'.

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String)
val offerDataset: Dataset[Offer] = ....

Я хочу сохранить или обновить вышеуказанное 'offerDataset' на кассандре с отметкой времени записи, определяемой полем "metadata_last_modified_source_time" из «предложение» сущность.

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("metadata_last_modified_source_time")))

Когда я пишу Кассандре, мне грозит следующее исключение. Может кто-нибудь помочь мне понять эту проблему. Получена та же ошибка с типами util.Date и Long для 'metadata_last_modified_source_time'.

com.datastax.driver.core.exceptions.InvalidTypeException: Value metadata_last_modified_source_time is of type bigint, not timestamp
at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:83)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:529)
at com.datastax.driver.core.AbstractData.set(AbstractData.java:536)
at com.datastax.driver.core.BoundStatement.set(BoundStatement.java:870)
at com.datastax.spark.connector.writer.BoundStatementBuilder.com$datastax$spark$connector$writer$BoundStatementBuilder$$bindColumnNull(BoundStatementBuilder.scala:59)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun$5.apply(BoundStatementBuilder.scala:83)

1 Ответ

1 голос
/ 15 апреля 2020

Я нашел решение после прохождения этого do c - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

Введено новое поле writeTime в Предложение case-класс, который должен отображаться в метку времени записи таблицы cassandra

case class Offer(offer_id: String, metadata_last_modified_source_time: java.sql.Timestamp, product_type: String, writeTime: sql.Date)

При создании offerDataSet я установил значение поля writeTime равным

val offerDataset: Dataset[Offer] = {....
   ....
    val writeTime = new Date(metadata_last_modified_source_time.getTime())
   ....
   ....
}

offerDataset.rdd.saveToCassandra("cassandra_keyspace", "cassandra_table", writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
...