Scala на Spark [отфильтровывать повторяющиеся строки после Self Join] - PullRequest
0 голосов
/ 25 января 2019

Я пытался отфильтровать данные, используя python

|name_x | age_x | salary_x | name_y | age_y | salary_y | age_diff
| James | 23    | 200000   | Jack   | 24    | 210040   |  1
| Jack  | 24    | 210040   | James  | 23    | 200000   |  1
| Irene | 25    | 200012   | John   | 25    | 210000   |  0
| Johny | 26    | 21090    | Elon   | 29    | 210012   |  3
| Josh  | 24    | 21090    | David  | 23    | 213012   |  1
| John  | 25    | 210000   | Irene  | 25    | 200012   |  0

row1 и row2 также являются дубликатами row3 и строки6 являются дубликатами
name_x == name_y , age_x == age_y , salary_x == salary_y
и не учитывая age_diff , который является Output .Мне нужно отфильтровать их, [один из дубликатов строки].

Нужен конечный вывод: как показано ниже: отфильтровывать дубликаты

|name_x | age_x | salary_x | name_y | age_y | salary_y | age_diff
| James | 23    | 200000   | Jack   | 24    | 210040   |  1
| Irene | 25    | 200012   | John   | 25    | 210000   |  0
| Johny | 26    | 21090    | Elon   | 29    | 210012   |  3
| Josh  | 24    | 21090    | David  | 23    | 213012   |  1

В python реализовано, как показано ниже,возвращает индекс дубликатов, а также, который слишком медленный.

def duplicate_index(df):
    length = len(df.columns) - 1 # -1 for removing the time difference
    length = length//2
    nrows = df.shape[0]
    duplicate_index = [] 
    for row in range(nrows-1):
        count  = 0
        for frow in range(row+1,nrows):
            if (list(df.iloc[row][:length]) == list(df.iloc[frow][length:-1])):
                if (list(df.iloc[row][length:-1]) == list(df.iloc[frow][:length])):
                    duplicate_index.append(frow)
                    #print(row, frow)
                    count = count + 1
            if count == 1:
                break
    return duplicate_index
del_index = duplicate_index(df)
final_df  = df.drop(index = del_index)

Но теперь мне пришлось делать это на Scala, используя spark, есть ли какой-нибудь более быстрый способ приблизиться к этим фильтрам или что-то вроде смещение в питоне.или окно на Scala

Ответы [ 2 ]

0 голосов
/ 25 января 2019

Я думаю, что ответ astro_asz - более чистый подход, но для полноты, вот как это сделать с помощью окна:

РЕДАКТИРОВАТЬ: я изменил набор данных, чтобы два человека имели одинаковое имя и добавил уникальный идентификаторна основе содержимого каждой строки

val people = Seq(
  ("1", "James", 23, 200000),
  ("1", "James", 24, 210040),  // two people with same name
  ("2", "Irene", 25, 200012),
  ("2", "John",  25, 210000),
  ("3", "Johny", 26,  21090),
  ("3", "Elon",  29, 200000),
  ("4", "Josh",  24, 200000),
  ("4", "David", 23, 200000))

val columns = Seq("ID", "name", "age", "salary")
val df = people.toDF(columns:_*)

// In general you want to use the primary key from the underlying data store
// as your unique keys.  If for some weird reason the primary key is not 
// available or does not exist, you can try to create your own.  This
// is fraught with danger.  If you are willing to make the (dangerous)
// assumption a unique row is enough to uniquely identify the entity in
// that row, you can use a md5 hash of the contents of the row to create
// your id
val withKey = df.withColumn("key", md5(concat(columns.map(c => col(c)):_*)))

val x = withKey.toDF(withKey.columns.map(c => if (c == "ID") c else "x_" + c):_*)
val y = withKey.toDF(withKey.columns.map(c => if (c == "ID") c else "y_" + c):_*)

val partition = Window.partitionBy("ID").orderBy("x_key")
val df2 = x.join(y, Seq("ID"))
  .where('x_key =!= 'y_key)
  .withColumn("rank", rank over partition)
  .where('rank === 1)
  .drop("rank", "x_key", "y_key")

df2.show
/*-+------+-----+--------+------+-----+--------+                         
|ID|x_name|x_age|x_salary|y_name|y_age|y_salary|
+--+------+-----+--------+------+-----+--------+
| 3|  Elon|   29|  200000| Johny|   26|   21090|
| 1| James|   24|  210040| James|   23|  200000|
| 4| David|   23|  200000|  Josh|   24|  200000|
| 2| Irene|   25|  200012|  John|   25|  210000|
+--+------+-----+--------+------+-----+-------*/
0 голосов
/ 25 января 2019

Вы можете добавить в объединение дополнительное условие, которое содержит только одну из двух строк, например name_x

Пример кадра данных:

  val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
    Seq(
      Row(1, "James",  1, 10),
      Row(1, "Jack",   2, 20),
      Row(2, "Tom",    3, 30),
      Row(2, "Eva",    4, 40)
    )
  )

  val schema: StructType = new StructType()
    .add(StructField("id",      IntegerType,  false))
    .add(StructField("name",    StringType,  false))
    .add(StructField("age",     IntegerType, false))
    .add(StructField("salary",  IntegerType, false))

  val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)

  df0.sort("id").show()

, который дает:

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  1|James|  1|    10|
|  1| Jack|  2|    20|
|  2|  Eva|  4|    40|
|  2|  Tom|  3|    30|
+---+-----+---+------+

Переименовать столбцы кадра данных:

val df1 = df0.columns.foldLeft(df0)((acc, x) => acc.withColumnRenamed(x, x+"_x"))
val df2 = df0.columns.foldLeft(df0)((acc, x) => acc.withColumnRenamed(x, x+"_y"))

Затем выполните объединение с тремя условиями:

val df3 = df1.join(df2,
    col("id_x") === col("id_y") and
    col("name_x") =!= col("name_y") and
    col("name_x") < col("name_y"),
    "inner")
df3.show()

, которое возвращает

+----+------+-----+--------+----+------+-----+--------+                         
|id_x|name_x|age_x|salary_x|id_y|name_y|age_y|salary_y|
+----+------+-----+--------+----+------+-----+--------+
|   1|  Jack|    2|      20|   1| James|    1|      10|
|   2|   Eva|    4|      40|   2|   Tom|    3|      30|
+----+------+-----+--------+----+------+-----+--------+

В зависимости от того, как вы определяете дубликат в ваших данных, условие, которое различает два дубликата, будет различным.

...