У меня есть сценарий использования, в котором я должен применить несколько уже обученных моделей (например, M1, M2, ..Mn) к одному и тому же искровому потоку (полученному из kafka).
Модели были обучены с использованиемалгоритм изоляции леса отсюда: https://github.com/titicaca/spark-iforest
Я нашел нечто похожее с моим случаем здесь https://www.youtube.com/watch?v=EhRHQPCdldI,, но, к сожалению, я не знаю, сделала ли компания Genesys (бывшая AltoCloud) этоAPI (StreamPipeline, Heterogenous Pipeline) с открытым исходным кодом.
Я справился с этим с помощью приведенного выше кода схемы, но я не знаю, насколько оптимальным является.
//read the stream
val kafkaStreamDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", "topic")
.load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the same stream, otherwise blocked??
myModels.par.foreach(lm => {
//load the model
val model = PipelineModel.load(lm)
kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
//apply model
val pdf = model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS value").write
.format("json")
.save("anom/" + lm + System.currentTimeMillis())
}).start().awaitTermination()
})
Вопросы: 1. Следовательно,Я хотел бы знать, есть ли какой-либо Spark API для обработки такого варианта использования?
Если да, где я могу его найти?
Если нет, как я могу оптимально реализовать это?
ЛюбойИдея, предложения высоко ценится.