Искра dataFrame для-если l oop занимает много времени - PullRequest
1 голос
/ 17 февраля 2020

У меня есть Spark DF (df):

enter image description here

Я должен преобразовать ниже что-то вроде этого:

enter image description here

По сути, он должен обнаруживать новое предложение всякий раз, когда находит точку остановки (".") И другую строку.

Я написал код для выше:

val spark = SparkSession.builder.appName("elasticSpark").master("local[*]").config("spark.scheduler.mode", "FAIR").getOrCreate()
val count = df.count.toInt
var emptyDF = Seq.empty[(Int, Int, String)].toDF("start_time", "end_time", "Sentences")
var b = 0
for (a <- 1 to count){
if(d9.select("words").head(a)(a-1).toSeq.head == "." || a == (count-1))
{
val myList1 = d9.select("words").head(a).toArray.map(_.getString(0))
val myList = d9.select("words").head(a).toArray.map(_.getString(0)).splitAt(b)._2
val text = myList.mkString(" ")
val end_time = d9.select("end_time").head(a)(a-1).toSeq.head.toString.toInt
val start_time = d9.select("start_time").head(a)(b).toSeq.head.toString.toInt
val df1 = spark.sparkContext.parallelize(Seq(start_time)).toDF("start_time")
val df2 = spark.sparkContext.parallelize(Seq(end_time)).toDF("end_time")
val df3 = spark.sparkContext.parallelize(Seq(text)).toDF("Sentences")
val df4 = df1.crossJoin(df2).crossJoin(df3)
emptyDF = emptyDF.union(df4).toDF
b = a
}
}

Несмотря на то, что он дает правильный вывод, но для завершения итерации ему требуется возраст, и у меня есть 117 других df, которые мне нужно запустить.

Любой другой способ настройки этого кода или любого другого способ достижения вышеуказанной операции? Любая помощь будет высоко оценена.

Ответы [ 3 ]

1 голос
/ 18 февраля 2020
scala> import org.apache.spark.sql.expressions.Window

scala> df.show(false)
+----------+--------+--------+
|start_time|end_time|words   |
+----------+--------+--------+
|132       |135     |Hi      |
|135       |135     |,       |
|143       |152     |I       |
|151       |152     |am      |
|159       |169     |working |
|194       |197     |on      |
|204       |211     |hadoop  |
|211       |211     |.       |
|218       |222     |This    |
|226       |229     |is      |
|234       |239     |Spark   |
|245       |249     |DF      |
|253       |258     |coding  |
|258       |258     |.       |
|276       |276     |I       |
+----------+--------+--------+


scala> val w = Window.orderBy("start_time", "end_time")

scala> df.withColumn("temp", sum(when(lag(col("words"), 1).over(w) === ".", lit(1)).otherwise(lit(0))).over(w))
             .groupBy("temp").agg(min("start_time").alias("start_time"), max("end_time").alias("end_time"),concat_ws(" ",collect_list(trim(col("words")))).alias("sentenses"))
             .drop("temp")
             .show(false)
+----------+--------+-----------------------------+
|start_time|end_time|sentenses                    |
+----------+--------+-----------------------------+
|132       |211     |Hi , I am working on hadoop .|
|218       |258     |This is Spark DF coding .    |
|276       |276     |I                            |
+----------+--------+-----------------------------+
1 голос
/ 17 февраля 2020

Вот моя попытка. Вы можете использовать окно, чтобы отделить предложение, посчитав число . для следующих строк.

import org.apache.spark.sql.expressions.Window
val w = Window.orderBy("start_time").rowsBetween(Window.currentRow, Window.unboundedFollowing)
val df = Seq((132, 135, "Hi"),
             (135, 135, ","),
             (143, 152, "I"),
             (151, 152, "am"),
             (159, 169, "working"),
             (194, 197, "on"),
             (204, 211, "hadoop"),
             (211, 211, "."),
             (218, 212, "This"),
             (226, 229, "is"), 
             (234, 239, "Spark"),
             (245, 249, "DF"),
             (253, 258, "coding"),
             (258, 258, "."),
             (276, 276, "I")).toDF("start_time", "end_time", "words")
df.withColumn("count", count(when(col("words") === ".", true)).over(w))
  .groupBy("count")
  .agg(min("start_time").as("start_time"), max("end_time").as("end_time"), concat_ws(" ", collect_list("words")).as("Sentences"))
  .drop("count").show(false)

Затем это даст вам следующий результат, но в нем есть пробелы между словами и , или . следующим образом:

+----------+--------+-----------------------------+
|start_time|end_time|Sentences                    |
+----------+--------+-----------------------------+
|132       |211     |Hi , I am working on hadoop .|
|218       |258     |This is Spark DF coding .    |
|276       |276     |I                            |
+----------+--------+-----------------------------+
0 голосов
/ 17 февраля 2020

Вот мой подход с использованием udf без оконной функции.

val df=Seq((123,245,"Hi"),(123,245,"."),(123,245,"Hi"),(123,246,"I"),(123,245,".")).toDF("start","end","words")

  var count=0
  var flag=false
  val counterUdf=udf((dot:String) => {
    if(flag) {
      count+=1
    flag=false
    }
    if (dot == ".")
      flag=true
    count
  })

  val df1=df.withColumn("counter",counterUdf(col("words")))

  val df2=df1.groupBy("counter").agg(min("start").alias("start"), max("end").alias("end"), concat_ws(" ", collect_list("words")).alias("sentence")).drop("counter")

  df2.show()

+-----+---+--------+
|start|end|sentence|
+-----+---+--------+
|  123|246|  Hi I .|
|  123|245|    Hi .|
+-----+---+--------+
...