Ошибка возврата искры rdd из функции, вызываемой внутри функции map - PullRequest
0 голосов
/ 04 мая 2019

У меня есть коллекция строк строк (растений, как показано ниже) из таблицы hbase, и я хочу создать функцию fetchData, которая возвращает данные rdd для строк строк из коллекции. Цель состоит в том, чтобы получить объединение RDD из метода fetchData для каждого элемента из коллекции растений. Я дал соответствующую часть кода ниже. Моя проблема в том, что код выдает ошибку компиляции для возвращаемого типа fetchData:

println ("PartB:" + hBaseRDD.getNumPartitions)

ошибка: значение getNumPartitions не является членом Option [org.apache.spark.rdd.RDD [it.nerdammer.spark.test.sys.Record]]

Я использую scala 2.11.8 spark 2.2.0 и компиляцию maven

import it.nerdammer.spark.hbase._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object sys {
  case class systems( rowkey: String, iacp: Option[String], temp: Option[String])

  val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
  import spark.implicits._

  type Record = (String, Option[String], Option[String])

  def fetchData(plant: String): RDD[Record] = {
    val start_index = plant
    val end_index = plant + "z"
    //The below command works fine if I run it in main function, but to get multiple rows from hbase, I am using it in a separate function
    spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)

  }

  def main(args: Array[String]) {
    //the below elements in the collection are prefix of relevant rowkeys in hbase table ("test_table") 
    val plants = Vector("a8","cu","aw","fx")
    val hBaseRDD = plants.map( pp => fetchData(pp))
    println("Part: "+ hBaseRDD.getNumPartitions)
    /*
      rest of the code
    */
  }

}

Вот рабочая версия кода. Проблема в том, что я использую цикл for, и мне нужно запрашивать данные, соответствующие вектору rowkey (заводы), из HBase в каждом цикле, вместо того, чтобы сначала получать все данные, а затем выполнять остальные коды

    import it.nerdammer.spark.hbase._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
    import org.apache.log4j.Level
    import org.apache.log4j.Logger
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    object sys {
      case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
      def main(args: Array[String]) {

        val spark = SparkSession.builder().appName("myApp").config("spark.executor.cores",4).getOrCreate()
        import spark.implicits._

        type Record = (String, Option[String], Option[String])
        val plants = Vector("a8","cu","aw","fx")

        for (plant <- plants){
          val start_index = plant
          val end_index = plant + "z"
          val hBaseRDD = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
          println("Part: "+ hBaseRDD.getNumPartitions)
          /*
            rest of the code
          */
        }
      }
    }

После попытки, вот где я застрял. Итак, как я могу привести тип к требуемому.

scala>   def fetchData(plant: String) = {
     |     val start_index = plant
     |     val end_index = plant + "~"
     |     val x1 = spark.sparkContext.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("pp").withStartRow(start_index).withStopRow(end_index)
     |     x1
     |   }

Определить функцию в REPL и запустить ее

scala> val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)
<console>:39: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Option[String], Option[String])]
 required: it.nerdammer.spark.hbase.HBaseReaderBuilder[(String, Option[String], Option[String])]
       val hBaseRDD = plants.map( pp => fetchData(pp)).reduceOption(_ union _)

Заранее спасибо!

1 Ответ

3 голосов
/ 04 мая 2019

Тип hBaseRDD - Vector[_], а не RDD[_], поэтому вы не можете выполнить метод getNumPartitions для него. Если я правильно понимаю, вы хотите объединить извлеченные RDD. Вы можете сделать это с помощью plants.map( pp => fetchData(pp)).reduceOption(_ union _) (я рекомендую использовать reduceOption, потому что в пустом списке это не сработает, но вы можете использовать reduce, если уверены, что список не пустой)

Также возвращается тип fetchData RDD[U], но я не нашел определения U. Вероятно, это причина, по которой компилятор выводит Vector[Nothing] вместо Vector[RDD[Record]]. Чтобы избежать последующих ошибок, вы также должны изменить RDD[U] на RDD[Record].

...