невозможно сохранить элементы строки набора данных через mapPartitions () в переменных - PullRequest
1 голос
/ 10 ноября 2019

Я пытаюсь создать набор данных Spark, а затем использую mapPartitions, пытаюсь получить доступ к каждому из его элементов и сохранить их в переменных. Используя приведенный ниже фрагмент кода для того же самого:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val df = spark.sql("select col1,col2,col3 from table limit 10")

val schema = StructType(Seq(
              StructField("col1", StringType),
              StructField("col2", StringType),
              StructField("col3", StringType)))

val encoder = RowEncoder(schema)

df.mapPartitions{iterator => { val myList = iterator.toList
                 myList.map(x=> { val value1 = x.getString(0)
                                  val value2 = x.getString(1)
                                  val value3 = x.getString(2)}).iterator}} (encoder)

Я получаю сообщение об ошибке этого кода:

<console>:39: error: type mismatch;
 found   : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.Encoder[Unit]
 val value3 = x.getString(2)}).iterator}} (encoder)

В конечном итоге я нацеливаюсь на сохранение элементов строки в переменных,и выполнить некоторые операции с ними. Не уверен, что мне здесь не хватает. Любая помощь в этом направлении будет принята с благодарностью!

1 Ответ

0 голосов
/ 10 ноября 2019

На самом деле, есть несколько проблем с вашим кодом:

  1. У вашего оператора-карты нет возвращаемого значения, поэтому Unit
  2. Если вы возвращаете кортеж String из mapPartitionsвам не нужно RowEncoder (потому что вы не возвращаете Row, а Tuple3, который не нуждается в кодировщике, потому что его Product)

ВыВы можете написать свой код следующим образом:

df
 .mapPartitions{itr => itr.map(x=> (x.getString(0),x.getString(1),x.getString(2)))}
 .toDF("col1","col2","col3") // Convert Dataset to Dataframe, get desired field names

Но вы можете просто использовать простой оператор select в DataFrame API, здесь нет необходимости mapPartitions

df
.select($"col1",$"col2",$"col3")
...