Как применить к нескольким трубопроводам ML (моделям) к одному и тому же искровому потоку - PullRequest
0 голосов
/ 31 октября 2019

У меня есть сценарий использования, в котором я должен применить несколько уже обученных моделей (например, M1, M2, ..Mn) к одному и тому же искровому потоку (полученному из kafka).

Модели были обучены с использованиемалгоритм изоляции леса отсюда: https://github.com/titicaca/spark-iforest

Я нашел нечто похожее с моим случаем здесь https://www.youtube.com/watch?v=EhRHQPCdldI,, но, к сожалению, я не знаю, сделала ли компания Genesys (бывшая AltoCloud) этоAPI (StreamPipeline, Heterogenous Pipeline) с открытым исходным кодом.

Я справился с этим с помощью приведенного выше кода схемы, но я не знаю, насколько оптимальным является.

//read the stream
val kafkaStreamDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", broker)
      .option("subscribe", "topic")
      .load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the same stream, otherwise blocked??
 myModels.par.foreach(lm => {

     //load the model     
     val model = PipelineModel.load(lm)

      kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
        //apply model
        val pdf = model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS value").write
          .format("json")
          .save("anom/" + lm +  System.currentTimeMillis())
      }).start().awaitTermination()
    })

Вопросы: 1. Следовательно,Я хотел бы знать, есть ли какой-либо Spark API для обработки такого варианта использования?

Если да, где я могу его найти?

Если нет, как я могу оптимально реализовать это?

ЛюбойИдея, предложения высоко ценится.

1 Ответ

0 голосов
/ 04 ноября 2019

AFAIK это можно сделать как this .... Но что, если ваше время обработки превышает время получения. сообщения будут накапливаться, что приведет к замедлению потокового приема. iForest использует древовидную структуру для моделирования данных. для завершения алгоритма потребуется некоторое время.

Я предпочитаю хранить в хранилище, как раздел hdfs ... и применять ML к нему пакетным способом с фиксированными интервалами времени. чтобы вы получали сообщения без задержки и эффективно обрабатывали их.

...