Я использую Apache Flink для прогнозирования потоков из Twitter.
Код реализован в Scala
Моя проблема в том, что моей обученной модели SVM из API DataSet нужен набор данных в качествевход для Предиката () - Метод.
Я уже видел здесь вопрос, где пользователь сказал, что вам нужно написать собственную функцию MapFunction, которая читает модель при запуске задания (ref: Прогнозирование потоковой передачи в реальном времени во Flink с использованием scala )
Но я не могу написать / понять этот код.
Даже если я получу модель внутри StreamingMapFunction. Мне все еще нужен набор данных в качестве параметра, чтобы предсказать результат.
Я действительно надеюсь, что кто-то может показать / объяснить мне, как это делается.
Flink-Version: 1.9 Scala-Version: 2.11 Flink-ML: 2.11
val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
featureVectorService.learnTrainingData(labeledData, false)
//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
//learning
svm.fit(trainingData)
//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))
//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)