У меня есть следующий код: -
val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
val collection = kafkaStream.map(_._2).map(parser)
collection.foreachRDD(rdd =>
{
if (!rdd.partitions.isEmpty) {
try {
val dfs = rdd.toDF()
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "tablename", "keyspace" -> "dbname"))
.mode(SaveMode.Append).save()
} catch {
case e: Exception => e.printStackTrace
}
} else {
println("blank rdd")
}
})
В приведенном выше примере я сохраняю потоковую передачу искр на кассандре, используя датафрейм. Теперь я хочу, чтобы у каждой строки df было свое время записи, подобное этой команде -
insert into table (imei , date , gpsdt ) VALUES ( '1345','2010-10-12','2010-10-12 10:10:10') USING TIMESTAMP 1530313803922977;
Таким образом, время записи каждой строки должно быть равно столбцу gpsdt этой строки. При поиске я нашел эту ссылку, но она показывает пример RDD, я хочу аналогичный вариант использования dataframe - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Любые предложения, Спасибо