Как уменьшить спарк-фрейм данных до максимального количества строк для каждого значения в столбце? - PullRequest
1 голос
/ 10 января 2020

Мне нужно уменьшить datafame и экспортировать его в паркет. Мне нужно убедиться, что у меня есть экс. 10000 строк для каждого значения в столбце.

Фрейм данных, с которым я работаю, выглядит следующим образом:

+-------------+-------------------+
|         Make|              Model|
+-------------+-------------------+
|      PONTIAC|           GRAND AM|
|        BUICK|            CENTURY|
|        LEXUS|             IS 300|
|MERCEDES-BENZ|           SL-CLASS|
|      PONTIAC|           GRAND AM|
|       TOYOTA|              PRIUS|
|   MITSUBISHI|      MONTERO SPORT|
|MERCEDES-BENZ|          SLK-CLASS|
|       TOYOTA|              CAMRY|
|         JEEP|           WRANGLER|
|    CHEVROLET|     SILVERADO 1500|
|       TOYOTA|             AVALON|
|         FORD|             RANGER|
|MERCEDES-BENZ|            C-CLASS|
|       TOYOTA|             TUNDRA|
|         FORD|EXPLORER SPORT TRAC|
|    CHEVROLET|           COLORADO|
|   MITSUBISHI|            MONTERO|
|        DODGE|      GRAND CARAVAN|
+-------------+-------------------+

Мне нужно вернуть максимум 10000 строк для каждая модель:

+--------------------+-------+
|               Model|  count|
+--------------------+-------+
|                 MDX|1658647|
|               ASTRO| 682657|
|           ENTOURAGE|  72622|
|             ES 300H|  80712|
|            6 SERIES| 145252|
|           GRAN FURY|   9719|
|RANGE ROVER EVOQU...|   4290|
|        LEGACY WAGON|   2070|
|        LEGACY SEDAN|    104|
|  DAKOTA CHASSIS CAB|      8|
|              CAMARO|2028678|
|                  XT|  10009|
|             DYNASTY| 171776|
|                 944|  43044|
|         F430 SPIDER|    506|
|FLEETWOOD SEVENTY...|      6|
|         MONTE CARLO|1040806|
|             LIBERTY|2415456|
|            ESCALADE| 798832|
| SIERRA 3500 CLASSIC|   9541|
+--------------------+-------+

Этот вопрос не тот же, потому что он, как другие предложили ниже, извлекает только те строки, в которых значение больше других значений , Я хочу for each value in df['Model']: limit rows for that value(model) to 10,000 if there are 10,000 or more rows (очевидно, псевдокод). Другими словами, если больше 10000 строк, избавьтесь от остальных, в противном случае оставьте все строки.

Ответы [ 4 ]

1 голос
/ 15 января 2020

Я полагаю, вы должны поставить row_number с window, orderBy и partitionBy, чтобы запросить результат, а затем вы можете фильтровать с вашим лимитом. Например, получение случайного тасования и ограничение выборки до 10000 строк на значение демонстрируется следующим:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

window = Window.partitionBy(df['Model']).orderBy(F.rand())
df = df.select(F.col('*'), 
               F.row_number().over(window).alias('row_number')) \
               .where(F.col('row_number') <= 10000)
0 голосов
/ 10 января 2020

Если я понимаю ваш вопрос, вы хотите выбрать несколько строк (например, 10000), но в этих записях должно быть больше 10000. Если я понимаю ваш вопрос, это ответ:

df = df.groupBy('Make', 'Model').agg(count(lit(1)).alias('count'))
df = df.filter(df['count']>10000).select('Model','count')
df.write.parquet('output.parquet')
0 голосов
/ 15 января 2020

Я немного изменю данную проблему, чтобы ее можно было визуализировать, уменьшив максимальное количество строк для каждого отдельного значения до 2 строк (вместо 10 000).

Пример кадра данных:

df = spark.createDataFrame(
  [('PONTIAC', 'GRAND AM'), ('BUICK', 'CENTURY'), ('LEXUS', 'IS 300'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('MERCEDES-BENZ', 'SL-CLASS'), ('PONTIAC', 'GRAND AM'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('TOYOTA', 'PRIUS'), ('MITSUBISHI', 'MONTERO SPORT'), ('MERCEDES-BENZ', 'SLK-CLASS'), ('TOYOTA', 'CAMRY'), ('JEEP', 'WRANGLER'), ('CHEVROLET', 'SILVERADO 1500'), ('TOYOTA', 'AVALON'), ('FORD', 'RANGER'), ('MERCEDES-BENZ', 'C-CLASS'), ('TOYOTA', 'TUNDRA'), ('FORD', 'EXPLORER SPORT TRAC'), ('CHEVROLET', 'COLORADO'), ('MITSUBISHI', 'MONTERO'), ('DODGE', 'GRAND CARAVAN')],
  ['Make', 'Model']
)

Давайте подсчитаем количество строк:

df.groupby('Model').count().collect()

+-------------------+-----+
|              Model|count|
+-------------------+-----+
|             AVALON|    2|
|            CENTURY|    1|
|             TUNDRA|    2|
|           WRANGLER|    3|
|           GRAND AM|    3|
|EXPLORER SPORT TRAC|    1|
|            C-CLASS|    2|
|      MONTERO SPORT|    3|
|              CAMRY|    3|
|      GRAND CARAVAN|    1|
|     SILVERADO 1500|    2|
|              PRIUS|    3|
|            MONTERO|    1|
|           COLORADO|    1|
|             RANGER|    2|
|          SLK-CLASS|    3|
|           SL-CLASS|    2|
|             IS 300|    1|
+-------------------+-----+

Если я правильно понимаю ваш вопрос, вы можете назначить номер строки каждой строке с разделом на Model:

from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc

win_1 = Window.partitionBy('Model').orderBy(desc('Make'))
df = df.withColumn('row_num', row_number().over(win_1))

row_num

А затем отфильтруйте строки в том месте, где row_num <= 2:

df = df.filter(df.row_num <= 2).select('Make', 'Model')

Всего должно быть 2 + 1 + 2 + 2 + 2 + 1 + 2 + 2 + 2 + 1 + 2 + 2 + 1 + 1 + 2 + 2 + 2 + 1 = 30 строк

Окончательные результаты:

+-------------+-------------------+
|         Make|              Model|
+-------------+-------------------+
|       TOYOTA|             AVALON|
|       TOYOTA|             AVALON|
|        BUICK|            CENTURY|
|       TOYOTA|             TUNDRA|
|       TOYOTA|             TUNDRA|
|         JEEP|           WRANGLER|
|         JEEP|           WRANGLER|
|      PONTIAC|           GRAND AM|
|      PONTIAC|           GRAND AM|
|         FORD|EXPLORER SPORT TRAC|
|MERCEDES-BENZ|            C-CLASS|
|MERCEDES-BENZ|            C-CLASS|
|   MITSUBISHI|      MONTERO SPORT|
|   MITSUBISHI|      MONTERO SPORT|
|       TOYOTA|              CAMRY|
|       TOYOTA|              CAMRY|
|        DODGE|      GRAND CARAVAN|
|    CHEVROLET|     SILVERADO 1500|
|    CHEVROLET|     SILVERADO 1500|
|       TOYOTA|              PRIUS|
|       TOYOTA|              PRIUS|
|   MITSUBISHI|            MONTERO|
|    CHEVROLET|           COLORADO|
|         FORD|             RANGER|
|         FORD|             RANGER|
|MERCEDES-BENZ|          SLK-CLASS|
|MERCEDES-BENZ|          SLK-CLASS|
|MERCEDES-BENZ|           SL-CLASS|
|MERCEDES-BENZ|           SL-CLASS|
|        LEXUS|             IS 300|
+-------------+-------------------+
0 голосов
/ 10 января 2020

Просто сделай

import pyspark.sql.functions as F

df = df.groupBy("Model").agg(F.count(F.lit(1)).alias("Count"))
df = df.filter(df["Count"] < 10000).select("Model", "Count")

df.write.parquet("data.parquet")
...