У меня есть коллекция строк строк (растений, как показано ниже) из таблицы hbase, и я хочу создать функцию fetchData, которая возвращает данные rdd для строк строк из коллекции. Цель состоит в том, чтобы получить объединение RDD из метода fetchData для каждого элемента из коллекции растений. Я дал соответствующую часть кода ниже. Моя проблема в том, что код выдает ошибку компиляции для возвращаемого типа fetchData:
println ("PartB:" + hBaseRDD.getNumPartitions)
ошибка: значение getNumPartitions не является членом Option [org.apache.spark.rdd.RDD [it.nerdammer.spark.test.sys.Record]]
Я использую scala 2.11.8 spark 2.2.0 и компиляцию maven
import it.nerdammer.spark.hbase._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sys {
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
import spark.implicits._
type Record = (String, Option[String], Option[String])
def fetchData(plant: String): RDD[Record] = {
val start_index = plant
val end_index = plant + "z"
//The below command works fine if I run it in main function, but to get multiple rows from hbase, I am using it in a separate function
spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
}
def main(args: Array[String]) {
//the below elements in the collection are prefix of relevant rowkeys in hbase table ("test_table")
val plants = Vector("a8","cu","aw","fx")
val hBaseRDD = plants.map( pp => fetchData(pp))
println("Part: "+ hBaseRDD.getNumPartitions)
/*
rest of the code
*/
}
}
Вот рабочая версия кода. Проблема в том, что я использую цикл for, и мне нужно запрашивать данные, соответствующие вектору rowkey (заводы), из HBase в каждом цикле, вместо того, чтобы сначала получать все данные, а затем выполнять остальные коды
import it.nerdammer.spark.hbase._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sys {
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
import spark.implicits._
type Record = (String, Option[String], Option[String])
val plants = Vector("a8","cu","aw","fx")
for (plant <- plants){
val start_index = plant
val end_index = plant + "z"
val hBaseRDD = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
println("Part: "+ hBaseRDD.getNumPartitions)
/*
rest of the code
*/
}
}
}
После попытки, вот где я застрял. Итак, как я могу привести тип к требуемому.
scala> def fetchData(plant: String) = {
| val start_index = plant
| val end_index = plant + "~"
| val x1 = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
| x1
| }
Определить функцию в REPL и запустить ее
scala> val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)
<console>:39: error: type mismatch;
found : org.apache.spark.rdd.RDD[(String, Option[String], Option[String])]
required: it.nerdammer.spark.hbase.HBaseReaderBuilder[(String, Option[String], Option[String])]
val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)
Заранее спасибо!