Эта документация относится к альфа-версии Spark Cassandra Connector, поэтому следует ожидать, что что-то не работает. Как указано в документации - это опция таблицы, поэтому вы можете установить ее с помощью options
. Вам нужно только переключиться с типов util.Date
на Timestamp
или Long
- Spark SQL не поддерживает кодирование с типа Date
.
со следующими определениями, все работает:
import java.time.Instant
import java.sql.Timestamp
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp,
product_type: String, writeTime: Long)
val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", 1243124234L),
Offer("456", Timestamp.from(Instant.now()), "test", 12431242366L)).toDF
или с Timestamp
:
case class Offer(offer_id: String, metadata_last_modified_source_time: Timestamp,
product_type: String, writeTime: Timestamp)
val offerDataset = Seq(Offer("123", Timestamp.from(Instant.now()), "test", new Timestamp(1243124234L)),
Offer("456", Timestamp.from(Instant.now()), "test", new Timestamp(12431242366L))).toDF
, если мы используем следующую структуру таблицы:
create table test.wrt_test (
offer_id text,
metadata_last_modified_source_time timestamp,
product_type text,
primary key(offer_id, metadata_last_modified_source_time));
, тогда вы можете сохранить данные следующим образом ( только в 3.0 -alpha! ):
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.cassandra._
offerDataset.write.cassandraFormat("wrt_test", "test")
.option("writetime", "writeTime") // here you specify name of the column with time!
.mode(SaveMode.Append).save()
, но она также отлично работает в текущей версии выпуска, если вы используете RDD API:
import com.datastax.spark.connector.writer._
offerDataset.rdd.saveToCassandra("test", "wrt_test",
writeConf = WriteConf(timestamp = TimestampOption.perRow("writeTime")))
После записи в обоих случаях вы получить следующее:
cqlsh> select offer_id, metadata_last_modified_source_time, product_type, writetime(product_type) from test.wrt_test;
offer_id | metadata_last_modified_source_time | product_type | writetime(product_type)
----------+------------------------------------+--------------+-------------------------
123 | 2020-04-16 07:28:38.905000+0000 | test | 1243124234
456 | 2020-04-16 07:28:38.905000+0000 | test | 12431242366
(2 rows)