Группировка записей по шаблону значений строк с использованием PySpark - PullRequest
3 голосов
/ 02 апреля 2020

У меня есть таблица с 3 столбцами:

Table A:

+----+----+----------+                                                          
|col1|col2|row_number|
+----+----+----------+
|   X|   1|         1|
|   Y|   0|         2|
|   Z|   2|         3|
|   A|   1|         4|
|   B|   0|         5|
|   C|   0|         6|
|   D|   2|         7|
|   P|   1|         8|
|   Q|   2|         9|
+----+----+----------+

Я хочу объединить строки в "col1", группируя записи на основе значений "col2". «col2» имеет шаблон 1, за которым следует любое число от 0 до 2. Я хочу сгруппировать записи, в которых «col2» начинается с 1 и заканчивается 2 (порядок фрейма данных должен быть сохранен - ​​вы можете использовать столбец row_number для заказа)

Например, первые 3 записи могут быть сгруппированы вместе, потому что "col2" имеет "1-0-2". Следующие 4 записи могут быть сгруппированы вместе, потому что их значения "col2" имеют "1-0-0-2"

Объединение может быть выполнено с использованием "concat_ws" после того, как я сгруппирую эти записи. Но какая-нибудь помощь в том, как сгруппировать эти записи на основе шаблона "1-0s-2"?

Ожидаемый результат:

+----------+
|output_col|
+----------+
|       XYZ|   
|      ABCD|   
|        PQ| 
+----------+

Вы можете использовать следующий код для создания этого примера данных :

schema = StructType([StructField("col1", StringType())\
                   ,StructField("col2", IntegerType())\
                   ,StructField("row_number", IntegerType())])

data = [['X', 1, 1], ['Y', 0, 2], ['Z', 2, 3], ['A', 1, 4], ['B', 0, 5], ['C', 0, 6], ['D', 2, 7], ['P', 1, 8], ['Q', 2, 9]]

df = spark.createDataFrame(data,schema=schema)
df.show()

1 Ответ

2 голосов
/ 02 апреля 2020

Я бы предложил вам использовать window функции. Сначала используйте окно, упорядоченное по row_number, чтобы получить добавочную сумму , равную col2. incremental sum будет иметь кратных 3 , которые в основном будут конечными точками нужной группы . Замените их с лагом того же самого окна , чтобы получить желаемых разделов в incremental_sum. Теперь вы можете groupBy incremental_sum столбец и collect_list. Вы можете array_join ( spark2.4 ) в собранном списке, чтобы получить желаемые строки.

from pyspark.sql import functions as F 
from pyspark.sql.window import Window
w=Window().orderBy("row_number")
df.withColumn("incremental_sum", F.sum("col2").over(w))\
  .withColumn("lag", F.lag("incremental_sum").over(w))\
  .withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
  .groupBy("incremental_sum").agg(F.array_join(F.collect_list("col1"),"").alias("output_col")).drop("incremental_sum").show()
+----------+
|output_col|
+----------+
|       XYZ|
|      ABCD|
|        PQ|
+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...