Невозможно оценить модель ML для структурированной потоковой передачи, поскольку преобразования и действия RDD вызываются внутри других преобразований. - PullRequest
0 голосов
/ 03 мая 2018

Это хорошо известное ограничение [1] структурированной потоковой передачи, которое я пытаюсь обойти, используя пользовательский приемник.

В дальнейшем modelsMap является картой строковых ключей для org.apache.spark.mllib.stat.KernelDensity моделей

и streamingData это потоковый фрейм данных org.apache.spark.sql.DataFrame = [id1: string, id2: string ... 6 more fields]

Я пытаюсь сравнить каждую строку streamingData с соответствующей моделью из modelsMap, улучшить каждую строку с помощью prediction и написать в Kakfa.

Очевидным способом было бы .withColumn, с использованием UDF для прогнозирования и записи с использованием приемника kafka.

Но это незаконно, потому что:

org.apache.spark.SparkException: This RDD lacks a SparkContext. It 
could happen in the following cases: (1) RDD transformations and 
actions are NOT invoked by the driver, but inside of other 
transformations; for example, rdd1.map(x => rdd2.values.count() * x) is 
invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, 
see SPARK-5063.

Я получаю ту же ошибку с пользовательским приемником, который реализует forEachWriter, что было немного неожиданно:

    import org.apache.spark.sql.ForeachWriter
    import java.util.Properties
    import kafkashaded.org.apache.kafka.clients.producer._

     class  customSink(topic:String, servers:String) extends ForeachWriter[(org.apache.spark.sql.Row)] {
          val kafkaProperties = new Properties()
          kafkaProperties.put("bootstrap.servers", servers)
          kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
          kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
          val results = new scala.collection.mutable.HashMap[String, String]
          var producer: KafkaProducer[String, String] = _

          def open(partitionId: Long,version: Long): Boolean = {
            producer = new KafkaProducer(kafkaProperties)
            true
          }

          def process(value: (org.apache.spark.sql.Row)): Unit = {
            var prediction = Double.NaN
            try {
                val id1 = value(0)
                val id2 = value(3)
                val id3 = value(5)
                val time_0 = value(6).asInstanceOf[Double]
                val key = f"$id1/$id2/$id3" 
                var model = modelsMap(key)
                println("Looking up key: ",key)
                var prediction = Double.NaN
                prediction = model.estimate(Array[Double](time_0))(0)
                println(prediction)
            } catch {
                case e: NoSuchElementException =>
                val prediction = Double.NaN
                println(prediction)
            }    
              producer.send(new ProducerRecord(topic, value.mkString(",")+","+prediction.toString))
          }

          def close(errorOrNull: Throwable): Unit = {
            producer.close()
          }
       }

val writer = new customSink("<broker>", "<topic>")

val query = streamingData
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()

model.estimate реализован под капотом с использованием aggregate в mllib.stat, и нет способа обойти его.

Какие изменения я делаю? (Я мог бы collect каждый пакет и выполнить цикл for, используя драйвер, но тогда я не использую спарк так, как это было задумано)

Рекомендации:

  1. https://www.slideshare.net/databricks/realtime-machine-learning-analytics-using-structured-streaming-and-kinesis-firehose слайд № 11 упоминает ограничения

  2. https://www.oreilly.com/learning/extend-structured-streaming-for-spark-ml

  3. https://github.com/holdenk/spark-structured-streaming-ml (предлагаемое решение)

  4. https://issues.apache.org/jira/browse/SPARK-16454

  5. https://issues.apache.org/jira/browse/SPARK-16407

...