Как разделить набор данных на два набора данных с уникальными и повторяющимися строками в каждом? - PullRequest
0 голосов
/ 22 ноября 2018

Я хочу получить дубликаты записей в кадре данных Spark Scala.Например, я хочу получить дубликаты значений на основе 3 столбцов, таких как «id», «name», «age». Часть условия не содержит ни одного столбца (динамический ввод).основываясь на значении столбца, я хочу взять дубликаты записей.

приведенный ниже код, который я пробовал.только один атрибут я пробовал.Я не знаю, как это сделать, если более одного столбца.

Мой код:

 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
 var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
      .over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input) 
      .where($"count">1)
      .show()

Входной фрейм данных: (findDuplicateRecordsDF.show ())

       --------------------------------------------------------
       |  id   |  name | age |  phone      | email_id          |
       |-------------------------------------------------------|  
       |  3    | sam   | 23  |  9876543210 | sam@yahoo.com     | 
       |  7    | ram   | 27  |  8765432190 | ram@gmail.com     |
       |  3    | sam   | 28  |  9876543210 | sam@yahoo.com     | 
       |  6    | haris | 30  |  6543210777 | haris@gmail.com   |
       |  9    | ram   | 27  |  8765432130 | ram94@gmail.com   |
       |  6    | haris | 24  |  6543210777 | haris@gmail.com   | 
       |  4    | karthi| 26  |  4321066666 | karthi@gmail.com  | 
       --------------------------------------------------------

здесь я собираюсь взять дублирующиеся записи на основе 4 столбцов (идентификатор, имя, телефон, адрес электронной почты).Выше приведен пример фрейма данных.исходный фрейм данных не содержит ни одного столбца.

Выходной фрейм данных должен быть

  1. Выход дубликатов записей

           --------------------------------------------------------
           |  id   |  name | age |  phone      | email_id          |
           |-------------------------------------------------------|  
           |  3    | sam   | 23  |  9876543210 | sam@yahoo.com     | 
           |  3    | sam   | 28  |  9876543210 | sam@yahoo.com     | 
           |  6    | haris | 30  |  6543210777 | haris@gmail.com   |
           |  6    | haris | 24  |  6543210777 | haris@gmail.com   | 
            --------------------------------------------------------
    
  2. Вывод данных из уникальных записей:

          --------------------------------------------------------
         |  id   |  name | age |  phone      | email_id          |
         |-------------------------------------------------------|  
         |  7    | ram   | 27  |  8765432190 | ram@gmail.com     |
         |  9    | ram   | 27  |  8765432130 | ram94@gmail.com   |
         |  4    | karthi| 26  |  4321066666 | karthi@gmail.com  | 
          --------------------------------------------------------
    

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 22 ноября 2018

Вы можете использовать оконные функции.Проверьте это

scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name  |age|phone     |email_id        |cnt|
+---+------+---+----------+----------------+---+
|4  |karthi|26 |4321066666|karthi@gmail.com|1  |
|7  |ram   |27 |8765432190|ram@gmail.com   |1  |
|9  |ram   |27 |8765432130|ram94@gmail.com |1  |
|3  |sam   |23 |9876543210|sam@yahoo.com   |2  |
|3  |sam   |28 |9876543210|sam@yahoo.com   |2  |
|6  |haris |30 |6543210777|haris@gmail.com |2  |
|6  |haris |24 |6543210777|haris@gmail.com |2  |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")

// Дубликаты

scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name|     phone|       email_id|
+---+-----+----------+---------------+
|  3|  sam|9876543210|  sam@yahoo.com|
|  3|  sam|9876543210|  sam@yahoo.com|
|  6|haris|6543210777|haris@gmail.com|
|  6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+

// Уникальный

scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id|  name|     phone|        email_id|
+---+------+----------+----------------+
|  4|karthi|4321066666|karthi@gmail.com|
|  7|   ram|8765432190|   ram@gmail.com|
|  9|   ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+

РЕДАКТИРОВАТЬ 2:

val df = Seq(
  (4,"karthi",26,"4321066666","karthi@gmail.com"),
  (6,"haris",24,"6543210777","haris@gmail.com"),
  (7,"ram",27,"8765432190","ram@gmail.com"),
  (9,"ram",27,"8765432190","ram@gmail.com"),
  (6,"haris",24,"6543210777","haris@gmail.com"),
  (3,"sam",23,"9876543210","sam@yahoo.com"),
  (3,"sam",23,"9876543210","sam@yahoo.com"),
  (3,"sam",28,"9876543210","sam@yahoo.com"),
  (6,"haris",30,"6543210777","haris@gmail.com")
  ).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show

Результаты:

+---+-----+----------+---------------+
| id| name|     phone|       email_id|
+---+-----+----------+---------------+
|  6|haris|6543210777|haris@gmail.com|
|  6|haris|6543210777|haris@gmail.com|
|  3|  sam|9876543210|  sam@yahoo.com|
|  3|  sam|9876543210|  sam@yahoo.com|
|  9|  ram|8765432190|  ram@gmail.com|
+---+-----+----------+---------------+

РЕДАКТИРОВАТЬ3: - Проверка состояния нуля

val df = Seq(
  (4,"karthi",26,"4321066666","karthi@gmail.com"),
  (6,"haris",30,"6543210777","haris@gmail.com"),
  (6,"haris",30,null,"haris@gmail.com"),
  (7,"ram",27,"8765432190","ram@gmail.com"),
  (9,"ram",27,"8765432190","ram@gmail.com"),
  (6,"haris",24,"6543210777","haris@gmail.com"),
  (6,null,24,"6543210777",null),
  (3,"sam",23,"9876543210","sam@yahoo.com"),
  (3,"sam",23,"9876543210","sam@yahoo.com"),
  (3,"sam",28,"9876543210","sam@yahoo.com"),
  (6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count  = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " +  dup_cols_str +  ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)

Результаты:

+---+------+---+----------+----------------+----------+
|id |name  |age|phone     |email_id        |null_count|
+---+------+---+----------+----------------+----------+
|4  |karthi|26 |4321066666|karthi@gmail.com|3         |
|6  |haris |30 |6543210777|haris@gmail.com |3         |
|6  |haris |30 |null      |haris@gmail.com |2         |
|7  |ram   |27 |8765432190|ram@gmail.com   |3         |
|9  |ram   |27 |8765432190|ram@gmail.com   |3         |
|6  |haris |24 |6543210777|haris@gmail.com |3         |
|6  |null  |24 |6543210777|null            |1         |
|3  |sam   |23 |9876543210|sam@yahoo.com   |3         |
|3  |sam   |23 |9876543210|sam@yahoo.com   |3         |
|3  |sam   |28 |9876543210|sam@yahoo.com   |3         |
|6  |haris |24 |6543210777|haris@gmail.com |3         |
+---+------+---+----------+----------------+----------+


|id |name  |age|phone     |email_id        |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6  |haris |30 |6543210777|haris@gmail.com |3         |3  |1  |
|6  |haris |24 |6543210777|haris@gmail.com |3         |3  |2  |
|6  |haris |24 |6543210777|haris@gmail.com |3         |3  |3  |
|3  |sam   |23 |9876543210|sam@yahoo.com   |3         |3  |1  |
|3  |sam   |23 |9876543210|sam@yahoo.com   |3         |3  |2  |
|3  |sam   |28 |9876543210|sam@yahoo.com   |3         |3  |3  |
|7  |ram   |27 |8765432190|ram@gmail.com   |3         |2  |1  |
|9  |ram   |27 |8765432190|ram@gmail.com   |3         |2  |2  |
|4  |karthi|26 |4321066666|karthi@gmail.com|3         |1  |1  |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone     |email_id       |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6  |24 |
|haris|6543210777|haris@gmail.com|6  |24 |
|sam  |9876543210|sam@yahoo.com  |3  |23 |
|sam  |9876543210|sam@yahoo.com  |3  |28 |
|ram  |8765432190|ram@gmail.com  |9  |27 |
+-----+----------+---------------+---+---+

blankcheck

val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^\s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )

Фильтровать только тогда, когда все 3 столбца либо пустые, либо нулевые

val df = Seq(
  (4,"karthi",26,"4321066666","karthi@gmail.com"),
  (6,"haris",30,"6543210777","haris@gmail.com"),
  (6,null,30,null,null),
  (7,"ram",27,"8765432190","ram@gmail.com"),
  (9,"",27,"",""),
  (7,"ram",27,"8765432190","ram@gmail.com"),
  (6,"haris",24,"6543210777","haris@gmail.com"),
  (6,null,24,"6543210777",null),
  (3,"sam",23,"9876543210","sam@yahoo.com"),
  (3,null,23,"9876543210","sam@yahoo.com"),
  (3,null,28,"9876543213",null),
  (6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^\s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count  = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count  !=  0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " +  dup_cols_str +  ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
0 голосов
/ 22 ноября 2018

Вам нужно дать имена столбцов через запятую.

col1 ..col2 should be of string type.
     val window= Window.partitionBy(col1,col2,..)


    findDuplicateRecordsDF.withColumn("count", count("*")
          .over(window)
          .where($"count">1)
          .show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...