Повторяющаяся запись перемещена на другой фрейм данных в Spark Scala - PullRequest
1 голос
/ 14 октября 2019

Фрейм данных имеет 3 миллиона записей. Я пытаюсь переместить только дубликаты записей в отдельный фрейм данных. Я использую spark 1.6 с scala Данные

IM,A-15ACWSSC,CP
IM,A-15ACWSSC,CP
IM,AK11-130BA,13MM BLK RUBBER CAB

Новый DataFrame

IM,A-15ACWSSC,CP
IM,A-15ACWSSC,CP

Код, который я использовал

var df = Seq(
      ("IM", "A-15ACWSSC", "ASSY 1.5V2", "CP"),
      ("IM", "A-15ACWSSC", "ASSY 1.5V2", "CP"),
      ("IN", "A-15ACWSSC", "ASSY 1.6V2", "CP1"),
      ("IN", "A-15ACWSSC", "ASSY 1.7V2", "CP2")
    ).toDF("COL1", "COL2", "COL3", "COL4")

    df.show()

    // +----+----------+----------+----+
    // |COL1|      COL2|      COL3|COL4|
    // +----+----------+----------+----+
    // |  IM|A-15ACWSSC|ASSY 1.5V2|  CP|
    // |  IM|A-15ACWSSC|ASSY 1.5V2|  CP|
    // |  IN|A-15ACWSSC|ASSY 1.6V2| CP1|
    // |  IN|A-15ACWSSC|ASSY 1.7V2| CP2|
    // +----+----------+----------+----+

    df.registerTempTable("CLEANFRAME")

    val CleanData = sqlContext.sql(
      """select COL1,COL2,COL3,COL4
                              from
                              (SELECT  COL1,COL2,COL3,COL4, count(1) over (partition by COL1,COL2,COL3,COL4) as Uniqueid
                              FROM CLEANFRAME)
                              where Uniqueid > 1
                              """).cache()
    CleanData.show

Но этоне дает никакого результата. Пожалуйста, помогите, если я что-то упустил.

Ответы [ 2 ]

1 голос
/ 14 октября 2019

Вы должны быть изменены, как показано ниже. Каждый столбец должен быть включен в группу по.

Редактировать : Используемые записи окон и дубликаты сохраняются.

 var df = Seq(
       ("IM","A-15ACWSSC","ASSY 1.5V2","CP"),
       ("IM","A-15ACWSSC","ASSY 1.5V2","CP"),
       ("IN","A-15ACWSSC","ASSY 1.6V2","CP1"),
       ("IN","A-15ACWSSC","ASSY 1.7V2","CP2")
   ).toDF("COL1", "COL2","COL3","COL4")

 df.show()

 // +----+----------+----------+----+
 // |COL1|      COL2|      COL3|COL4|
 // +----+----------+----------+----+
 // |  IM|A-15ACWSSC|ASSY 1.5V2|  CP|
 // |  IM|A-15ACWSSC|ASSY 1.5V2|  CP|
 // |  IN|A-15ACWSSC|ASSY 1.6V2| CP1|
 // |  IN|A-15ACWSSC|ASSY 1.7V2| CP2|
 // +----+----------+----------+----+


 df.createOrReplaceTempView("CLEANFRAME")

val CleanData= sqlContext.sql("""select COL1,COL2,COL3,COL4
                                  from 
                                  (SELECT  COL1,COL2,COL3,COL4, count(1) over (partition by COL1,COL2,COL3,COL4) as Uniqueid
                                  FROM CLEANFRAME)
                                  where Uniqueid > 1
                                  """ ).cache()

Ошибка:

Exception in thread "main" java.lang.RuntimeException: [3.79] failure: ``)'' expected but `(' found

                                  (SELECT  COL1,COL2,COL3,COL4, count(1) over (partition by COL1,COL2,COL3,COL4) as Uniqueid
0 голосов
/ 14 октября 2019

Вы можете попробовать это

scala> import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
scala> var win = Window.partitionBy("a","b","c","d").orderBy("a")

scala> var dff = Seq(("IM","A-15ACWSSC","ASSY 1.5V2","CP"), ("IM","A-15ACWSSC","ASSY 1.5V2","CP"), ("IM","AK11-130BA","13MM BLK RUBBER CAB FOOT","ap")).toDF("a","b","c","d")

scala> dff.show
+---+----------+--------------------+---+
|  a|         b|                   c|  d|
+---+----------+--------------------+---+
| IM|A-15ACWSSC|          ASSY 1.5V2| CP|
| IM|A-15ACWSSC|          ASSY 1.5V2| CP|
| IM|AK11-130BA|13MM BLK RUBBER C...| ap|
+---+----------+--------------------+---+
for finding duplicates and based on that filter whose value is >= 2

scala> var dff_dup = dff.withColumn("test",count("*").over(win)).filter($"test">=2)


scala> dff_dup.show
+---+----------+----------+---+----+
|  a|         b|         c|  d|test|
+---+----------+----------+---+----+
| IM|A-15ACWSSC|ASSY 1.5V2| CP|   2|
| IM|A-15ACWSSC|ASSY 1.5V2| CP|   2|
+---+----------+----------+---+----+
...