Scala: как взять любую общую последовательность в качестве входных данных для этого метода - PullRequest
0 голосов
/ 01 января 2019

Scala Noob здесь.Все еще пытаюсь изучить синтаксис.

Я пытаюсь сократить код, который мне нужно написать, чтобы преобразовать мои тестовые данные в DataFrames.Вот что у меня есть сейчас:

  def makeDf[T](seq: Seq[(Int, Int)], colNames: String*): Dataset[Row] = {
    val context = session.sqlContext
    import context.implicits._
    seq.toDF(colNames: _*)
  }

Проблема в том, что вышеописанный метод принимает только последовательность формы Seq[(Int, Int)] в качестве входных данных.Как мне сделать так, чтобы любая последовательность входила?Я могу изменить форму ввода на Seq[AnyRef], но затем код не может распознать вызов toDF как действительный символ.

Я не могу понять, как заставить это работать.Есть идеи?Спасибо!

Ответы [ 2 ]

0 голосов
/ 01 января 2019

Как @AssafMendelson уже объяснил, настоящая причина того, почему вы не можете создать Dataset из Any, заключается в том, что Spark требуется Encoder для преобразования объектов из них JVM представление для его внутреннего представление - и Spark не может гарантировать генерацию такого Encoder для Any type.

Ответы Assaf верны и будут работать.
Однако, ИМХО, это слишком ограничительно, так как будет работать только для Products (кортежей и классов case) - и даже если это включает в себя большинство вариантов использования, есть еще несколько исключенных.

Поскольку вам действительно нужен Encoder, вы можете оставить эту ответственность за клиентом.Который в большинстве случаев будет нуждаться только в вызове import spark.implicits._, чтобы получить их в области видимости.
Таким образом, это то, что я считаю, будет самым общим решением.

import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}

// Implicit SparkSession to make the call to further methods more transparent.
implicit val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._

def makeDf[T: Encoder](seq: Seq[T], colNames: String*)
                      (implicit spark: SparkSession): DataFrame =
  spark.createDataset(seq).toDF(colNames: _*)

def makeDS[T: Encoder](seq: Seq[T])
                      (implicit spark: SparkSession): Dataset[T] =
  spark.createDataset(seq)

Примечание: Это в основном переизобретает уже определенные функции из Spark.

0 голосов
/ 01 января 2019

Краткий ответ:

import scala.reflect.runtime.universe.TypeTag

def makeDf[T <: Product: TypeTag](seq: Seq[T], colNames: String*): DataFrame = ...

Объяснение:

Когда вы вызываете seq.toDF, вы фактически используете неявное значение, определенное в SQLImplicits:

implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
  DatasetHolder(_sqlContext.createDataset(s))
}

, котороев свою очередь требуется генерация кодера.Проблема в том, что кодеры определены только для определенных типов.Конкретно Product (т. Е. Кортеж, класс case и т. Д.) Вам также необходимо добавить неявный TypeTag, чтобы Scala мог преодолеть стирание типа (во время выполнения все последовательности имеют последовательность типов независимо от универсального типа. TypeTag предоставляет информацию об этом).

В качестве побочного узла вам не нужно извлекать sqlcontext из сеанса, вы можете просто использовать:

import sparkSession.implicits._
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...