Выбор «Эксклюзивных строк» ​​из фрейма данных PySpark - PullRequest
1 голос
/ 20 апреля 2020

У меня есть такой фрейм данных PySpark:

+----------+-----+
|account_no|types|
+----------+-----+
|         1|    K|
|         1|    A|
|         1|    S|
|         2|    M|
|         2|    D|
|         2|    S|
|         3|    S|
|         3|    S|
|         4|    M|
|         5|    K|
|         1|    S|
|         6|    S|
+----------+-----+

, и я пытаюсь выбрать номера счетов, для которых существует исключительно «S». Например: даже если «1» имеет тип = «S», я не буду выбирать его, потому что он также имеет другие типы. Но я выберу 3 и 6, потому что у них есть только один тип «S».

Что я сейчас делаю, так это: - сначала получите все учетные записи, для которых существует «K», и удалите их; который в этом примере удаляет «1» и «5» - второй - найти все учетные записи, для которых существует «D», и удалить их, который удаляет «2» - третий - найти все учетные записи, для которых существует «M», и удалить «4» ( «2» также получил «М», но он был удален на шаге 2). В-четвертых, найдите все учетные записи, для которых существует «А», и удалите их

Итак, теперь «1», «2», « 4 и 5 удаляются, и я получаю «3» и «6» с эксклюзивным «S».

Но это долгий процесс, как мне его оптимизировать? Спасибо

Ответы [ 4 ]

2 голосов
/ 20 апреля 2020

Другой альтернативой является подсчет различных по окну, а затем filter, где Distinct count == 1 и types == S, для упорядочивания можно назначить монотонно увеличивающийся идентификатор, а затем упорядочить по тому же.

from pyspark.sql import functions as F
W = Window.partitionBy('account_no')

out = (df.withColumn("idx",F.monotonically_increasing_id())
   .withColumn("Distinct",F.approx_count_distinct(F.col("types")).over(W)).orderBy("idx")
   .filter("Distinct==1 AND types =='S'")).drop('idx','Distinct')

out.show()

+----------+-----+
|account_no|types|
+----------+-----+
|         3|    S|
|         3|    S|
|         6|    S|
+----------+-----+
2 голосов
/ 20 апреля 2020

Один из способов сделать это - использовать функции Window. Сначала мы получаем sum из числа S в каждой account_no группировке . Затем мы сравниваем это с общим количеством записей для этого group в фильтре, если они соответствуют мы keep that number.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("account_no")
w1=Window().partitionBy("account_no").orderBy("types")

df.withColumn("sum_S", F.sum(F.when(F.col("types")=='S', F.lit(1)).otherwise(F.lit(0))).over(w))\
  .withColumn("total", F.max(F.row_number().over(w1)).over(w))\
  .filter('total=sum_S').drop("total","Sum_S").show()

#+----------+-----+
#|account_no|types|
#+----------+-----+
#|         6|    S|
#|         3|    S|
#|         3|    S|
#+----------+-----+
1 голос
/ 20 апреля 2020

Вы можете просто определить количество различных типов учетной записи, а затем отфильтровать учетные записи «S», которые имеют только 1 тип.

Вот мой код для этого:

from pyspark.sql.functions import countDistinct

data = [(1, 'k'),
        (1, 'a'),
        (1, 's'),
        (2, 'm'),
        (2, 'd'),
        (2, 's'),
        (3, 's'),
        (3, 's'),
        (4, 'm'),
        (5, 'k'),
        (1, 's'),
        (6, 's')]

df = spark.createDataFrame(data, ['account_no', 'types']).distinct()

exclusive_s_accounts = (df.groupBy('account_no').agg(countDistinct('types').alias('distinct_count'))
                        .join(df, 'account_no')
                        .where((col('types') == 's') & (col('distinct_count') == 1))
                        .drop('distinct_count'))
0 голосов
/ 20 апреля 2020

Другой альтернативный подход может заключаться в том, чтобы получить все типы в одном столбце, а затем применить операции фильтрации, чтобы исключить значения, отличные от «S».

from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collectivist
from pyspark.sql.functions import col
df = spark.read.csv("/Users/Downloads/account.csv", header=True, inferSchema=True, sep=",")
type_df = df.groupBy("account_no").agg(concat_ws(",",     collect_list("types")).alias("all_types")).select(col("account_no"),     col("all_types"))

+----------+---------+
|account_no|all_types|
+----------+---------+
|         1|  K,A,S,S|
|         6|        S|
|         3|      S,S|
|         5|        K|
|         4|        M|
|         2|    M,D,S|
+----------+---------+

further filtering using regular expression
only_s_df =  type_df.withColumn("S_status",F.col("all_types").rlike("K|A|M|D"))
only_s_df.show()
+----------+---------+----------+
|account_no|all_types|S_status  |
+----------+---------+----------+
|         1|  K,A,S,S|      true|
|         6|        S|     false|
|         3|      S,S|     false|
|         5|        K|      true|
|         4|        M|      true|
|         2|    M,D,S|      true|
+----------+---------+----------+

надеюсь, что таким образом вы сможете получить ответ и далее обработка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...