Apache Flink - обработка предсказаний - PullRequest
1 голос
/ 02 ноября 2019

В настоящее время я работаю с SVM-классом Apache Flink для прогнозирования некоторых текстовых данных.

Этот класс предоставляет функцию предиката, которая принимает DataSet [Vector] в качестве входных данных и дает мне DataSet [Prediction. ] как результат. Пока все хорошо.

Моя проблема в том, что у меня нет контекста, к какому тексту относится прогноз, и я не могу вставить текст в функцию предиката (), чтобы потом его иметь.

Код:

val tweets: DataSet[(SparseVector, String)] =
        source.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
                .map(tweet => (featureVectorService.transform(tweet._2))

    model.predict(tweets).print


result example:
(SparseVector((462,8.73165920153676), (10844,8.508515650222549), (15656,2.931052542245018)),-1.0)

Есть ли способ сохранить другие данные рядом с прогнозом, чтобы все было вместе? потому что без контекста предсказание мне не помогает.

Или, может быть, есть способ просто предсказать один вектор вместо DataSet, чтобы я мог вызвать функцию внутри функции карты выше.

1 Ответ

1 голос
/ 03 ноября 2019

Предиктор SVM ожидает в качестве входных данных подтип Vector. Следовательно, есть два варианта решения этой проблемы:

  1. Создайте подтип Vector, который содержит текст твита в виде тега. Затем он будет проходить через предиктор. Этот подход имеет то преимущество, что никаких дополнительных операций не требуется. Однако необходимо определить новые классы и утилиты для представления различных векторных типов с помощью тегов:
val env = ExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements("foobar", "barfo", "test")

val vectorizedInput = input.map(word => {
  val value = word.chars().sum()
  new DenseVectorWithTag(Array(value), word)
})

val svm = SVM().setBlocks(env.getParallelism)

val weights = env.fromElements(DenseVector(1.0))

svm.weightsOption = Option(weights) // skipping the training here

val predictionResult: DataSet[(DenseVectorWithTag, Double)] = svm.predict(vectorizedInput)

class DenseVectorWithTag(override val data: Array[Double], tag: String)
  extends DenseVector(data) {
  override def toString: String = "(" + super.toString + ", " + tag + ")"
}
Соедините прогноз DataSet со входом DataSet в векторизованном представлении tweets. Этот подход имеет то преимущество, что нам не нужно вводить новые классы. Цена, которую мы платим за это, является дополнительной операцией соединения, которая может быть дорогой:
val input = env.fromElements("foobar", "barfo", "test")

val vectorizedInput = input.map(word => {
  val value = word.chars().sum()
  (DenseVector(value), word)
})

val svm = SVM().setBlocks(env.getParallelism)

val weights = env.fromElements(DenseVector(1.0))

svm.weightsOption = Option(weights) // skipping the training here

val predictionResult = svm.predict(vectorizedInput.map(a => a._1))
val inputWithPrediction: DataSet[(String, Double)] = vectorizedInput
  .join(predictionResult)
  .where(0)
  .equalTo(0)
  .apply((t, p) => (t._2, p._2))
...