Spark: Как настроить время записи при сохранении на Кассандре - PullRequest
1 голос
/ 15 апреля 2020

У меня есть существо, похожее на стол из кассандры. Я использую spark для сохранения / обновления данных в Cassandra. Здесь сущность Предложение класс дела

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, product_type: String, writeTime: util.Date)

val offerDataset: Dataset[Offer] = ....

Я сохраняю эти данные, как показано ниже

offerDataset.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> cassandraKeyspace, "table" -> tableName))
      .mode(SaveMode.Append)
      .save()

Схема таблицы Кассандры:

OFFER(offer_id, metadata_last_modified_source_time, product_type) 

Проблема заключается в настройке writeTime поля Предложить сущность в качестве метки времени записи при сохранении / обновлении таблицы cassandra. Это упомянуто здесь в dasastax - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md для настройки как

writetime=columnName

enter image description here

Что я не мог ' Я не понимаю, как должен выглядеть синтаксис.

Любая помощь может быть высоко оценена

1 Ответ

1 голос
/ 16 апреля 2020

Эта документация относится к альфа-версии Spark Cassandra Connector, поэтому следует ожидать, что что-то не работает. Как указано в документации - это опция таблицы, поэтому вы можете установить ее с помощью options. Вам нужно только переключиться с типов util.Date на Timestamp или Long - Spark SQL не поддерживает кодирование с типа Date.

со следующими определениями, все работает:

import java.time.Instant
import java.sql.Timestamp

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
  product_type: String, writeTime: Long)

val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
  Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF

или с Timestamp:

case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp, 
   product_type: String, writeTime: Timestamp)

val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
  Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF

, если мы используем следующую структуру таблицы:

create table test.wrt_test (
  offer_id text,
  metadata_last_modified_source_time timestamp,
  product_type text,
  primary key(offer_id, metadata_last_modified_source_time));

, тогда вы можете сохранить данные следующим образом ( только в 3.0 -alpha! ):

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.cassandra._
offerDataset.write.cassandraFormat("wrt_test", "test")
    .option("writetime", "writeTime") // here you specify name of the column with time!
    .mode(SaveMode.Append).save()

, но она также отлично работает в текущей версии выпуска, если вы используете RDD API:

import com.datastax.spark.connector.writer._
offerDataset.rdd.saveToCassandra("test", "wrt_test", 
   writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))

После записи в обоих случаях вы получить следующее:

cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
----------+------------------------------------+--------------+-------------------------
      123 |    2020-04-16 07:28:38.905000+0000 |         test |              1243124234
      456 |    2020-04-16 07:28:38.905000+0000 |         test |             12431242366
(2 rows)
...