Как спарк интерпретирует тип столбца в редукторе - PullRequest
0 голосов
/ 09 февраля 2019

У меня есть следующая таблица

DEST_COUNTRY_NAME   ORIGIN_COUNTRY_NAME count
United States       Romania             15
United States       Croatia             1
United States       Ireland             344
Egypt               United States       15  

Таблица представлена ​​в виде набора данных.

scala> dataDS
res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Схема dataDS равна

scala> dataDS.printSchema;
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

Я хочу суммировать все значения столбца count.Я полагаю, что могу сделать это, используя reduce метод Dataset.

Я думал, что смогу сделать следующее, но получил ошибку

scala> (dataDS.select(col("count"))).reduce((acc,n)=>acc+n);
<console>:38: error: type mismatch;
 found   : org.apache.spark.sql.Row
 required: String
       (dataDS.select(col("count"))).reduce((acc,n)=>acc+n);
                                                         ^

Чтобы заставить код работать, у меня былоявно указать, что count равно Int, хотя в схеме это Int

scala> (dataDS.select(col("count").as[Int])).reduce((acc,n)=>acc+n);

Почему я должен был явно указать тип count?Почему Scala type inference не работает?На самом деле, схема промежуточного Dataset также выводит count как Int.

dataDS.select(col("count")).printSchema;
root
 |-- count: integer (nullable = true)

Ответы [ 2 ]

0 голосов
/ 09 февраля 2019

Просто следуйте типам или смотрите сообщения компилятора.

  • Вы начинаете с Dataset[FlightData].

  • Вы называете это select с col("count") в качестве аргумента.col(_) возвращает Column

  • Единственный вариант из Dataset.select, который принимает Column, возвращает DataFrame , который является псевдонимом для Dataset[Row].

  • Существует два варианта Dataset.reduce, один из которых принимает ReduceFunction[T], а второй (T, T) => T, где T - параметр конструктора типа Dataset, т.е. Dataset[T].(acc,n)=>acc+n функция является анонимной функцией Scala, поэтому применяется вторая версия.

  • Расширен:

    (dataDS.select(col("count")): Dataset[Row]).reduce((acc: Row, n: Row) => acc + n): Row
    

    , которая устанавливает ограничения - функция принимает Row и Row и возвращает Row.

  • Row не имеет + метода, поэтому единственный вариант удовлетворения

    (acc: ???, n: Row) => acc + n)
    

    - использовать String (Вы можете + Any до String.

    Однако это не удовлетворяет полному выражению - отсюда и ошибка.

  • Выуже выяснили, что вы можете использовать

    dataDS.select(col("count").as[Int]).reduce((acc, n) => acc + n)
    

    , где col("count").as[Int] является TypedColumn[Row, Int] и соответствует select возвращает Dataset[Int].

    Аналогично вы могли бы

    dataDS.select(col("count")).as[Int].reduce((acc, n) => acc + n)
    

    и

    dataDS.toDF.map(_.getAs[Int]("count")).reduce((acc, n) => acc + n)
    

    Во всех случаях

    .reduce((acc, n) => acc + n)
    

    составляет (Int, Int) => Int.

0 голосов
/ 09 февраля 2019

Я думаю, вам нужно сделать это по-другому.Я буду предполагать, что FlightData является классом с вышеуказанной схемой.Итак, решение использует карту и уменьшите, как показано ниже:

val totalSum = dataDS.map(_.count).reduce(_+_) //this line replace the above error as col("count") can't be selected.

Обновлено : проблема вывода не связана с набором данных. На самом деле, когда вы используете select, вы будете работать надDataframe (то же самое, если вы присоединяетесь), который не является статически типизированной схемой, и вы потеряете функцию класса вашего делаНапример, тип выбора будет Dataframe, а не Dataset, поэтому вы не сможете вывести тип.

val x: DataFrame = dataDS.select('count)
val x: Dataset[Int] = dataDS.map(_.count)

Кроме того, из этого Ответ Чтобы получить TypedColumnиз столбца вы просто используете myCol.as[T].

Я сделал простой пример для воспроизведения кода и данных.

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object EntryMainPoint extends App {

  //val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
  val spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("SparkSessionZipsExample")
    //.config("spark.sql.warehouse.dir", warehouseLocation)
    .getOrCreate()

  val someData = Seq(
    Row("United States", "Romania", 15),
    Row("United States", "Croatia", 1),
    Row("United States", "Ireland", 344),
    Row("Egypt", "United States", 15)
  )


  val flightDataSchema = List(
    StructField("DEST_COUNTRY_NAME", StringType, true),
    StructField("ORIGIN_COUNTRY_NAME", StringType, true),
    StructField("count", IntegerType, true)
  )

  case class FlightData(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: Int)
  import spark.implicits._

  val dataDS = spark.createDataFrame(
    spark.sparkContext.parallelize(someData),
    StructType(flightDataSchema)
  ).as[FlightData]

  val totalSum = dataDS.map(_.count).reduce(_+_) //this line replace the above error as col("count") can't be selected.
  println("totalSum = " + totalSum)


  dataDS.printSchema()
  dataDS.show()


}

Вывод ниже

totalSum = 375

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
+-----------------+-------------------+-----+

Примечание:Вы можете сделать выборку из набора данных, используя приведенный ниже способ

val countColumn = dataDS.select('count) //or map(_.count)

. Вы также можете посмотреть этот lowerByKey в наборе данных Spark

...