Я пытаюсь запустить некоторые тесты на моей локальной машине с потоковой структурой с искрой.
В пакетном режиме вот строка, с которой я имею дело:
val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows = List(
Row(
Map("ID" -> "1",
"STRUCTUREID" -> "MFCD00869853",
"MOLFILE" -> "The MOL Data",
"MOLWEIGHT" -> "803.482",
"FORMULA" -> "C44H69NO12",
"NAME" -> "Tacrolimus",
"HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
"SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
"METABOLISM" -> "The metabolism 500"
)),
Row(
Map("ID" -> "2",
"STRUCTUREID" -> "MFCD00869854",
"MOLFILE" -> "The MOL Data",
"MOLWEIGHT" -> "603.482",
"FORMULA" -> "",
"NAME" -> "Tacrolimus2",
"HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
"SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
"METABOLISM" -> "The metabolism 500"
))
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)
Работа с этим в Batch more работает как шарм, без проблем.
Теперь я пытаюсь перейти в потоковый режим, используя MemoryStream для тестирования. Я добавил следующее:
implicit val ctx = spark.sqlContext
val intsInput = MemoryStream[Row]
Но компилятор жалуется на следующее:
Не найдены последствия для доказательства параметра $ 1: Кодировщик [Строка]
Отсюда мой вопрос: что мне делать здесь, чтобы это заработало
Также я увидел, что если я добавлю следующий импорт, ошибка исчезнет:
import spark.implicits ._
На самом деле, теперь я получаю следующее предупреждение вместо ошибки
Неоднозначные значения для доказательства параметров $ 1: кодировщик [Row]
Я не очень хорошо понимаю механизм кодирования и был бы признателен, если бы кто-нибудь мог объяснить мне, как не использовать эти последствия. Причина в том, что я написал следующее в книге, когда речь идет о создании DataFrame из строк.
Рекомендуемая оценка:
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()
И тогда автор продолжает это:
В Scala мы также можем использовать преимущества Spark в
консоль (и если вы импортируете их в свой JAR-код), запустив toDF на
Тип Seq. Это плохо работает с нулевыми типами, поэтому
обязательно рекомендуется для производственных случаев использования.
val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
Если бы кто-то мог найти время, чтобы объяснить, что происходит в моем сценарии, когда я использую неявное, и если это довольно безопасно, или есть способ сделать это более явно, не импортируя неявное.
Наконец, если бы кто-нибудь мог указать мне хороший документ по кодированию Encoder и Spark Type, это было бы здорово.
EDIT1
Наконец-то я начал работать с
implicit val ctx = spark.sqlContext
import spark.implicits._
val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
Хотя моя проблема в том, что я не уверен в том, что делаю. Мне кажется, что в какой-то ситуации мне нужно создать DataSet, чтобы иметь возможность преобразовывать его в DF [ROW] с преобразованием toDF. Я понял, что работа с DS безопасна, но медленнее, чем с DF. Так зачем этот посредник с DataSet? Это не первый раз, когда я вижу это в Spark Structured Streaming. Опять же, если бы кто-то мог помочь мне с этим, это было бы здорово.