Как преобразовать последовательную числовую обработку данных таблицы Cassandra в параллельную в Spark? - PullRequest
2 голосов
/ 27 марта 2019

Мы выполняем некоторое математическое моделирование данных из таблицы Cassandra с использованием коннектора spark cassandra, и в настоящее время выполняется последовательное получение выходных данных.Как вы распараллеливаете это для более быстрого выполнения?

Я новичок в Spark, и я попробовал несколько вещей, но я не могу понять, как использовать табличные данные в функциях map, groupby, reduby.Если кто-то может помочь (с некоторыми фрагментами кода) объяснить, как разбирать табличные данные, это будет действительно полезно.

import org.apache.spark.sql.{Row, SparkSession}
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf



class SparkExample(sparkSession: SparkSession, pathToCsv: String) {
  private val sparkContext = sparkSession.sparkContext
  sparkSession.stop()
  val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host","127.0.0.1")                           
  .setAppName("cassandra").setMaster("local[*]")
  val sc = new SparkContext(conf)


 def testExample(): Unit = {




val KNMI_rdd = sc.cassandraTable ("dbks1","knmi_w")


val Table_count = KNMI_rdd.count()
val KNMI_idx = KNMI_rdd.zipWithIndex
val idx_key = KNMI_idx.map{case (k,v) => (v,k)}

var i = 0
var n : Int = Table_count.toInt


println(Table_count)

for ( i  <- 1 to n if i < n) {
  println(i)


  val Row = idx_key.lookup(i)

  println(Row)


  val firstRow = Row(0)



  val yyyy_var = firstRow.get[Int]("yyyy")
  val mm_var = firstRow.get[Double]("mm")
  val dd_var = firstRow.get[Double]("dd")
  val dr_var = firstRow.get[Double]("dr")
  val tg_var = firstRow.get[Double]("tg")
  val ug_var = firstRow.get[Double]("ug")
  val loc_var = firstRow.get[String]("loc")



  val pred_factor = (((0.15461 * tg_var) + (0.8954 * ug_var)) / ((0.0000451 * dr_var) + 0.0004487))




  println(yyyy_var,mm_var,dd_var,loc_var)
  println(pred_factor)

 }

 }
}

  //test data

// loc | yyyy | mm | dd | dr  | tg  | ug
//-----+------+----+----+-----+-----+----
// AMS | 2019 |  1 |  1 |  35 |   5 | 84
// AMS | 2019 |  1 |  2 |  76 |  34 | 74
// AMS | 2019 |  1 |  3 |  46 |  33 | 85
// AMS | 2019 |  1 |  4 |  35 |   1 | 84
// AMS | 2019 |  1 |  5 |  29 |   0 | 93
// AMS | 2019 |  1 |  6 |  32 |  25 | 89
// AMS | 2019 |  1 |  7 |  42 |  23 | 89
// AMS | 2019 |  1 |  8 |  68 |  75 | 92
// AMS | 2019 |  1 |  9 |  98 |  42 | 86
// AMS | 2019 |  1 | 10 |  92 |  12 | 76
// AMS | 2019 |  1 | 11 |  66 |   0 | 71
// AMS | 2019 |  1 | 12 |  90 |  56 | 85
// AMS | 2019 |  1 | 13 |  83 | 139 | 90

Редактировать 1: Я устал от использования функции карты, и яв состоянии вычислить математические вычисления, как мне добавить ключи перед этими значениями, которые определены WeatherId?

            case class Weather( loc: String, yyyy: Int, mm: Int, dd: Int,dr: Double, tg: Double, ug: Double)
            case class WeatherId(loc: String, yyyy: Int, mm: Int, dd: Int)

                   val rows = dataset1
                                        .map(line => Weather(
                                              line.getAs[String]("loc"),
                                              line.getAs[Int]("yyyy"),
                                              line.getAs[Int]("mm"),
                                              line.getAs[Int]("dd"),
                                              line.getAs[Double]("dr"),
                                              line.getAs[Double]("tg"),
                                              line.getAs[Double]("ug")
                                                            ) )


                  val pred_factor   = rows
                                        .map(x => (( ((x.dr * betaz) + (x.tg * betay)) + (x.ug) * betaz)))

Спасибо

1 Ответ

0 голосов
/ 28 марта 2019

TL; DR; Используйте набор данных / набор данных вместо RDD.

Аргумент для DF над RDD длинный, но суть в том, что DF и их структурированные альтернативные DS превосходят RDD низкого уровня.

С помощью коннектора spark-cassandra вы можете настроить размер входного разбиения , который определяет размер размера раздела в искре, больше разделов, больше параллелизма.

val lastdf = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map(
    "table" -> "words",
    "keyspace" -> "test" ,
    "cluster" -> "ClusterOne",
    "spark.cassandra.input.split.size_in_mb" -> 48 // smaller size = more partitions
    )
  ).load()
...