Невозможно передать Seq [String] .parquet в Spark Scala - PullRequest
0 голосов
/ 04 февраля 2019

Я пытаюсь прочитать несколько путей за один вызов в Spark API в Scala с помощью метода .parquet.

У меня есть метод, который получает Seq[String], но, похоже,не может распознать его при включении в вызов метода и пытается получить String вместо Seq[String].

def readPaths(sparkSession: SparkSession, basePath: String, inputPaths: Seq[String]): Dataset[Row] = {
  sparkSession.read
    .option("basepath", basePath)
    .parquet(inputPaths) // Doesn't accept 'inputPaths'

}

В прокомментированной части он просто жалуется наSeq[String] не является объектом типа String, в то же время он принимает простой "", "", "", "".

Ответы [ 2 ]

0 голосов
/ 04 февраля 2019

Я думаю, что функция parquet() ожидает аргумент "varargs", то есть один или несколько аргументов типа String.

Вы можете передать его Seq[String], но вы должны дать подсказке компилятору указание распаковать Seq в несколько аргументов.

Пример, демонстрирующий использование varargs:

scala> def foo(i: String*) = i.mkString(",")
foo: (i: String*)String

scala> foo("a", "b", "c")
res0: String = a,b,c

scala> foo(Seq("a", "b", "c"))
<console>:13: error: type mismatch;
 found   : Seq[String]
 required: String
       foo(Seq("a", "b", "c"))
              ^

scala> foo(Seq("a", "b", "c"):_*)
res2: String = a,b,c

Как видите, подсказка :_* устраняет проблему.

0 голосов
/ 04 февраля 2019

Метод:

def parquet(paths: String*): DataFrame

ожидают переменные, а не явные следствия.По этой причине в Scala вы должны передать его как:

    def readPaths(sparkSession: SparkSession, basePath: String, inputPaths: Seq[String]): Dataset[Row] = {
  sparkSession.read
    .option("basepath", basePath)
    .parquet(inputPaths:_*)
  }

, пожалуйста, отметьте ": _ *" в конце вашего значения.

Проверено на spark2-shell (с Spark 2.3.0.cloudera3):

scala> case class MyProduct(key: Int, value: String, lastSeen: java.sql.Timestamp)
defined class MyProduct

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val baseDS = spark.createDataset(0 until 1000).map(i => MyProduct(i, s"valueOf:$i", new java.sql.Timestamp(System.currentTimeMillis())))
baseDS: org.apache.spark.sql.Dataset[MyProduct] = [key: int, value: string ... 1 more field]

scala> baseDS.withColumn("state", lit("IT"))
res10: org.apache.spark.sql.DataFrame = [key: int, value: string ... 2 more fields]

scala> res10.write.mode("overwrite").partitionBy("state").parquet("/tmp/test/parquet/")

scala> baseDS.withColumn("state", lit("US"))
res12: org.apache.spark.sql.DataFrame = [key: int, value: string ... 2 more fields]

scala> res12.write.mode("append").partitionBy("state").parquet("/tmp/test/parquet/")

scala> val inputPaths = Seq("/tmp/test/parquet/state=IT", "/tmp/test/parquet/state=US")
inputPaths: Seq[String] = List(/tmp/test/parquet/state=IT, /tmp/test/parquet/state=US)

scala> val data = spark.read.parquet(inputPaths)
<console>:38: error: overloaded method value parquet with alternatives:
  (paths: String*)org.apache.spark.sql.DataFrame <and>
  (path: String)org.apache.spark.sql.DataFrame
 cannot be applied to (Seq[String])
       val data = spark.read.parquet(inputPaths)
                             ^

scala> val data = spark.read.parquet(inputPaths:_*)
data: org.apache.spark.sql.DataFrame = [key: int, value: string ... 1 more field]

scala> data.show(10)
+---+-----------+--------------------+
|key|      value|            lastSeen|
+---+-----------+--------------------+
|500|valueOf:500|2019-02-04 17:05:...|
|501|valueOf:501|2019-02-04 17:05:...|
|502|valueOf:502|2019-02-04 17:05:...|
|503|valueOf:503|2019-02-04 17:05:...|
|504|valueOf:504|2019-02-04 17:05:...|
|505|valueOf:505|2019-02-04 17:05:...|
|506|valueOf:506|2019-02-04 17:05:...|
|507|valueOf:507|2019-02-04 17:05:...|
|508|valueOf:508|2019-02-04 17:05:...|
|509|valueOf:509|2019-02-04 17:05:...|
+---+-----------+--------------------+
only showing top 10 rows


scala>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...