Spark Structured Streaming Проблема с MemoryStream + Row + Encoders - PullRequest
0 голосов
/ 08 сентября 2018

Я пытаюсь запустить некоторые тесты на моей локальной машине с потоковой структурой с искрой.

В пакетном режиме вот строка, с которой я имею дело:

val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows         = List(
    Row(
      Map("ID" -> "1",
        "STRUCTUREID" -> "MFCD00869853",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "803.482",
        "FORMULA" -> "C44H69NO12",
        "NAME" -> "Tacrolimus",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
       )),
    Row(
      Map("ID" -> "2",
        "STRUCTUREID" -> "MFCD00869854",
        "MOLFILE" -> "The MOL Data",
        "MOLWEIGHT" -> "603.482",
        "FORMULA" -> "",
        "NAME" -> "Tacrolimus2",
        "HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
        "SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
        "METABOLISM" -> "The metabolism 500"
      ))
  )
val df  = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)

Работа с этим в Batch more работает как шарм, без проблем.

Теперь я пытаюсь перейти в потоковый режим, используя MemoryStream для тестирования. Я добавил следующее:

implicit val ctx = spark.sqlContext
val intsInput = MemoryStream[Row]

Но компилятор жалуется на следующее:

Не найдены последствия для доказательства параметра $ 1: Кодировщик [Строка]

Отсюда мой вопрос: что мне делать здесь, чтобы это заработало

Также я увидел, что если я добавлю следующий импорт, ошибка исчезнет:

import spark.implicits ._

На самом деле, теперь я получаю следующее предупреждение вместо ошибки

Неоднозначные значения для доказательства параметров $ 1: кодировщик [Row]

Я не очень хорошо понимаю механизм кодирования и был бы признателен, если бы кто-нибудь мог объяснить мне, как не использовать эти последствия. Причина в том, что я написал следующее в книге, когда речь идет о создании DataFrame из строк.

Рекомендуемая оценка:

val myManualSchema = new StructType(Array(
  new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

И тогда автор продолжает это:

В Scala мы также можем использовать преимущества Spark в консоль (и если вы импортируете их в свой JAR-код), запустив toDF на Тип Seq. Это плохо работает с нулевыми типами, поэтому обязательно рекомендуется для производственных случаев использования.

val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

Если бы кто-то мог найти время, чтобы объяснить, что происходит в моем сценарии, когда я использую неявное, и если это довольно безопасно, или есть способ сделать это более явно, не импортируя неявное.

Наконец, если бы кто-нибудь мог указать мне хороший документ по кодированию Encoder и Spark Type, это было бы здорово.

EDIT1

Наконец-то я начал работать с

  implicit val ctx = spark.sqlContext
  import spark.implicits._
  val rows = MemoryStream[Map[String,String]]
  val df = rows.toDF()

Хотя моя проблема в том, что я не уверен в том, что делаю. Мне кажется, что в какой-то ситуации мне нужно создать DataSet, чтобы иметь возможность преобразовывать его в DF [ROW] с преобразованием toDF. Я понял, что работа с DS безопасна, но медленнее, чем с DF. Так зачем этот посредник с DataSet? Это не первый раз, когда я вижу это в Spark Structured Streaming. Опять же, если бы кто-то мог помочь мне с этим, это было бы здорово.

1 Ответ

0 голосов
/ 06 мая 2019

Я рекомендую вам использовать Scala case classes для моделирования данных.

final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

Теперь вы можете иметь List из Product в памяти:

  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

API-интерфейс структурированной потоковой передачи упрощает анализ потоковой обработки с помощью широко известной абстракции Dataset[T]. Грубо говоря, вам просто нужно беспокоиться о трех вещах:

  • Источник : источник может генерировать поток входных данных, который мы можем представить как Dataset[Input]. Каждый новый элемент данных Input, который поступает, будет добавляться в этот неограниченный набор данных. Вы можете манипулировать данными по своему усмотрению (например, Dataset[Input] => Dataset[Output]).
  • StreamingQueries и Sink : запрос генерирует таблицу результатов, которая обновляется из источника каждый интервал триггера. Изменения записываются во внешнее хранилище, называемое Sink.
  • Режимы вывода : существуют различные режимы, в которых вы можете записывать данные в Sink: режим завершения, режим добавления и режим обновления.

Предположим, вы хотите знать продукты, которые имеют молекулярную массу более 200 единиц.

Как вы сказали, использовать пакетный API довольно просто и просто:

// Create an static dataset using the in-memory data
val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)

// Processing...
val result: Dataset[Product] = staticData.filter(_.weight > 200)

// Print results!
result.show()

При использовании Streaming API вам просто нужно определить source и sink в качестве дополнительного шага. В этом примере мы можем использовать MemoryStream и приемник console для распечатки результатов.

// Create an streaming dataset using the in-memory data (memory source)
val productSource = MemoryStream[Product]
productSource.addData(inMemoryRecords)

val streamingData: Dataset[Product] = productSource.toDS()

// Processing...
val result: Dataset[Product] = streamingData.filter(_.weight > 200)

// Print results by using the console sink. 
val query: StreamingQuery = result.writeStream.format("console").start()

// Stop streaming
query.awaitTermination(timeoutMs=5000)
query.stop()

Обратите внимание, что staticData и streamingData имеют точную сигнатуру типа (т.е. Dataset[Product]). Это позволяет нам применять одни и те же этапы обработки независимо от использования пакетного или потокового API. Вы также можете подумать о реализации общего метода def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ???, чтобы избежать повторения в обоих подходах.

Полный пример кода:

object ExMemoryStream extends App {

  // Boilerplate code...
  val spark: SparkSession = SparkSession.builder
    .appName("ExMemoryStreaming")
    .master("local[*]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._
  implicit val sqlContext: SQLContext = spark.sqlContext

  // Define your data models 
  final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

  // Create some in-memory instances
  val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
  )

  // Defining processing step
  def processing(inputData: Dataset[Product]): Dataset[Product] =
    inputData.filter(_.weight > 200)

  // STATIC DATASET
  val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)

  println("This is the static dataset:")
  processing(datasetStatic).show()

  // STREAMING DATASET
  val productSource = MemoryStream[Product]
  productSource.addData(inMemoryRecords)

  val datasetStreaming: Dataset[Product] = productSource.toDS()

  println("This is the streaming dataset:")
  val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
  query.awaitTermination(timeoutMs=5000)

  // Stop query and close Spark
  query.stop()
  spark.close()

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...