Это хорошо известное ограничение [1] структурированной потоковой передачи, которое я пытаюсь обойти, используя пользовательский приемник.
В дальнейшем modelsMap
является картой строковых ключей для org.apache.spark.mllib.stat.KernelDensity
моделей
и
streamingData
это потоковый фрейм данных org.apache.spark.sql.DataFrame = [id1: string, id2: string ... 6 more fields]
Я пытаюсь сравнить каждую строку streamingData
с соответствующей моделью из modelsMap
, улучшить каждую строку с помощью prediction
и написать в Kakfa.
Очевидным способом было бы .withColumn
, с использованием UDF для прогнозирования и записи с использованием приемника kafka.
Но это незаконно, потому что:
org.apache.spark.SparkException: This RDD lacks a SparkContext. It
could happen in the following cases: (1) RDD transformations and
actions are NOT invoked by the driver, but inside of other
transformations; for example, rdd1.map(x => rdd2.values.count() * x) is
invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information,
see SPARK-5063.
Я получаю ту же ошибку с пользовательским приемником, который реализует forEachWriter
, что было немного неожиданно:
import org.apache.spark.sql.ForeachWriter
import java.util.Properties
import kafkashaded.org.apache.kafka.clients.producer._
class customSink(topic:String, servers:String) extends ForeachWriter[(org.apache.spark.sql.Row)] {
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
val results = new scala.collection.mutable.HashMap[String, String]
var producer: KafkaProducer[String, String] = _
def open(partitionId: Long,version: Long): Boolean = {
producer = new KafkaProducer(kafkaProperties)
true
}
def process(value: (org.apache.spark.sql.Row)): Unit = {
var prediction = Double.NaN
try {
val id1 = value(0)
val id2 = value(3)
val id3 = value(5)
val time_0 = value(6).asInstanceOf[Double]
val key = f"$id1/$id2/$id3"
var model = modelsMap(key)
println("Looking up key: ",key)
var prediction = Double.NaN
prediction = model.estimate(Array[Double](time_0))(0)
println(prediction)
} catch {
case e: NoSuchElementException =>
val prediction = Double.NaN
println(prediction)
}
producer.send(new ProducerRecord(topic, value.mkString(",")+","+prediction.toString))
}
def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}
val writer = new customSink("<broker>", "<topic>")
val query = streamingData
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
model.estimate
реализован под капотом с использованием aggregate
в mllib.stat, и нет способа обойти его.
Какие изменения я делаю? (Я мог бы collect
каждый пакет и выполнить цикл for, используя драйвер, но тогда я не использую спарк так, как это было задумано)
Рекомендации:
https://www.slideshare.net/databricks/realtime-machine-learning-analytics-using-structured-streaming-and-kinesis-firehose слайд № 11 упоминает ограничения
https://www.oreilly.com/learning/extend-structured-streaming-for-spark-ml
https://github.com/holdenk/spark-structured-streaming-ml (предлагаемое решение)
https://issues.apache.org/jira/browse/SPARK-16454
https://issues.apache.org/jira/browse/SPARK-16407