Пишите в Cassandra со временем записи, используя фрейм данных в искре - PullRequest
0 голосов
/ 02 июля 2018

У меня есть следующий код: -

  val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
  val collection = kafkaStream.map(_._2).map(parser)
    collection.foreachRDD(rdd =>
      {
        if (!rdd.partitions.isEmpty) {
          try {
            val dfs = rdd.toDF() 
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "tablename", "keyspace" -> "dbname"))
              .mode(SaveMode.Append).save()
          } catch {
            case e: Exception => e.printStackTrace
          }
        } else {
          println("blank rdd")
        }
      })

В приведенном выше примере я сохраняю потоковую передачу искр на кассандре, используя датафрейм. Теперь я хочу, чтобы у каждой строки df было свое время записи, подобное этой команде -

insert into table (imei , date , gpsdt ) VALUES ( '1345','2010-10-12','2010-10-12 10:10:10') USING TIMESTAMP 1530313803922977;

Таким образом, время записи каждой строки должно быть равно столбцу gpsdt этой строки. При поиске я нашел эту ссылку, но она показывает пример RDD, я хочу аналогичный вариант использования dataframe - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md Любые предложения, Спасибо

1 Ответ

0 голосов
/ 02 июля 2018

Насколько мне известно, в версии DataFrame такой функциональности нет (есть соответствующий JIRA: https://datastax -oss.atlassian.net / browse / SPARKC-416 ). Но у вас все равно есть RDD, который вы конвертируете в DataFrame - почему бы не использовать saveToCassandra, как описано в приведенной вами ссылке?

P.S. у вас могут возникнуть проблемы с производительностью при проверке на пустоту (http://www.waitingforcode.com/apache-spark/isEmpty-trap-spark/read)

...