Apache Flink - SVM прогнозы на потоковые данные - PullRequest
0 голосов
/ 23 октября 2019

Я использую Apache Flink для прогнозирования потоков из Twitter.

Код реализован в Scala

Моя проблема в том, что моей обученной модели SVM из API DataSet нужен набор данных в качествевход для Предиката () - Метод.

Я уже видел здесь вопрос, где пользователь сказал, что вам нужно написать собственную функцию MapFunction, которая читает модель при запуске задания (ref: Прогнозирование потоковой передачи в реальном времени во Flink с использованием scala )

Но я не могу написать / понять этот код.

Даже если я получу модель внутри StreamingMapFunction. Мне все еще нужен набор данных в качестве параметра, чтобы предсказать результат.

Я действительно надеюсь, что кто-то может показать / объяснить мне, как это делается.

Flink-Version: 1.9 Scala-Version: 2.11 Flink-ML: 2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
        featureVectorService.learnTrainingData(labeledData, false)

//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
        val svm = SVM()
                .setBlocks(env.getParallelism)
                .setIterations(100)
                .setRegularization(0.001)
                .setStepsize(0.1)
                .setSeed(42)
//learning
svm.fit(trainingData)

//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))

//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
                .flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)

1 Ответ

1 голос
/ 24 октября 2019

Итак, в настоящее время FlinkML, частью которого является SVM, не поддерживает потоковый API. Вот почему SVM принимает только DataSet. Идея не в том, чтобы использовать FlinkML, а в некоторой библиотеке SVM, доступной в scala или java. Затем вы можете прочитать модель, например, из файла. Проблема в том, что Вы должны реализовать большую часть логики самостоятельно.

Комментарий в сообщении, которое Вы упомянули, более или менее говорит о том же самом.

...