как сделать быстрее оконный текстовый файл и машинное обучение через окна в искре - PullRequest
0 голосов
/ 11 апреля 2019

Я пытаюсь использовать Spark для изучения мультиклассовой логистической регрессии в оконном текстовом файле.То, что я делаю, это сначала создаю окна и взрываю их в $"word_winds".Затем переместите центральное слово каждого окна в $"word".Чтобы соответствовать модели LogisticRegression, я конвертирую каждое отдельное слово в класс ($"label"), тем самым он учится.Я рассчитываю на разные метки, чтобы отсеивать те, у которых мало образцов minF.

Проблема в том, что некоторая часть кода очень очень медленная, даже для небольших входных файлов (вы можете использовать какой-нибудь файл README для проверки кода).Гугл, некоторые пользователи испытывают медлительность при использовании explode.Они предлагают некоторые модификации кода для ускорения в 2 раза.Тем не менее, я думаю, что с входным файлом 100 МБ этого было бы недостаточно.Пожалуйста, предложите что-то другое, возможно, чтобы избежать действий, которые замедляют код.Я использую Spark 2.4.0 и sbt 1.2.8 на 24-ядерном компьютере.

import org.apache.spark.sql.functions._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types._



object SimpleApp {
  def main(args: Array[String]) {

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

spark.sparkContext.setCheckpointDir("checked_dfs")

val in_file = "sample.txt"
val stratified = true
val wsize = 7
val ngram = 3
val minF = 2

val windUdf = udf{s: String => s.sliding(ngram).toList.sliding(wsize).toList}
val get_mid = udf{s: Seq[String] => s(s.size/2)}
val rm_punct = udf{s: String => s.replaceAll("""([\p{Punct}|¿|\?|¡|!]|\p{C}|\b\p{IsLetter}{1,2}\b)\s*""", "")}

// Read and remove punctuation
var df = spark.read.text(in_file)
                    .withColumn("value", rm_punct($"value"))

// Creating windows and explode them, and get the center word into $"word" 
df = df.withColumn("char_nGrams", windUdf('value))
        .withColumn("word_winds", explode($"char_nGrams"))
        .withColumn("word", get_mid('word_winds))
val indexer = new StringIndexer().setInputCol("word")
                                    .setOutputCol("label")
df = indexer.fit(df).transform(df)

val hashingTF = new HashingTF().setInputCol("word_winds")
                                .setOutputCol("freqFeatures")
df = hashingTF.transform(df)
val idf = new IDF().setInputCol("freqFeatures")
                    .setOutputCol("features")
df = idf.fit(df).transform(df)
// Remove word whose freq is less than minF
var counts = df.groupBy("label").count
                                .filter(col("count") > minF)
                                .orderBy(desc("count"))
                                .withColumn("id", monotonically_increasing_id())
var filtro = df.groupBy("label").count.filter(col("count") <= minF)
df = df.join(filtro, Seq("label"), "leftanti")
var dfs = if(stratified){
// Create stratified sample 'dfs'
        var revs = counts.orderBy(asc("count")).select("count")
                                                .withColumn("id", monotonically_increasing_id())
        revs = revs.withColumnRenamed("count", "ascc")
// Weigh the labels (linearly) inversely ("ascc") proportional NORMALIZED weights to word ferquency

        counts = counts.join(revs, Seq("id"), "inner").withColumn("weight", col("ascc")/df.count)
        val minn = counts.select("weight").agg(min("weight")).first.getDouble(0) - 0.01
        val maxx = counts.select("weight").agg(max("weight")).first.getDouble(0) - 0.01
        counts = counts.withColumn("weight_n", (col("weight") - minn) / (maxx - minn))
        counts = counts.withColumn("weight_n", when(col("weight_n") > 1.0, 1.0)
                       .otherwise(col("weight_n")))
        var fractions = counts.select("label", "weight_n").rdd.map(x => (x(0), x(1)
                                .asInstanceOf[scala.Double])).collectAsMap.toMap
        df.stat.sampleBy("label", fractions, 36L).select("features", "word_winds", "word", "label")
        }else{ df }
dfs = dfs.checkpoint()

val lr = new LogisticRegression().setRegParam(0.01)

val Array(tr, ts) = dfs.randomSplit(Array(0.7, 0.3), seed = 12345)
val training = tr.select("word_winds", "features", "label", "word")
val test = ts.select("word_winds", "features", "label", "word")

val model = lr.fit(training)

def mapCode(m: scala.collection.Map[Any, String]) = udf( (s: Double) =>
                m.getOrElse(s, "")
        )
var labels = training.select("label", "word").distinct.rdd
                                             .map(x => (x(0), x(1).asInstanceOf[String]))
                                             .collectAsMap
var predictions = model.transform(test)
predictions = predictions.withColumn("pred_word", mapCode(labels)($"prediction"))
predictions.write.format("csv").save("spark_predictions")

spark.stop()
  }
}

1 Ответ

0 голосов
/ 11 апреля 2019

Поскольку ваши данные несколько малы, может быть полезно, если вы используете коалесценцию перед разнесением.Иногда может быть неэффективно иметь слишком много узлов, особенно если в вашем коде много перестановок.

Как вы сказали, у многих людей возникают проблемы с взрывом.Я посмотрел на предоставленную вами ссылку, но никто не упомянул, что пытался flatMap вместо взрыва.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...