Как равномерно сэмплировать данные в Pyspark Dataframe, используя windows - PullRequest
0 голосов
/ 22 апреля 2020

Публикация этого после попытки найти решение, которое оказалось бы таким простым на вид, но не нашло адекватного ответа нигде при переполнении стека.

Проблема : у меня есть фрейм данных, который Я хочу сделать выборку равномерно в течение нескольких месяцев. В рамке необработанных данных у меня есть уникальные идентификаторы, которые, естественно, будут повторяться через несколько месяцев. Мне нужно сделать выборку для фрейма данных, чтобы я выбирал только 1 случайный случайный случай из любого уникального идентификатора, и чтобы возвращаемый образец также был равномерным по распределению по месяцам. Наконец, я хочу иметь возможность контролировать размер (n) каждого образца.

Примечания : Прежде чем углубляться в мой подход, важно отметить, что приступим к уникальным наблюдениям Поле id представляет некоторые проблемы. Во-первых, я инстинктивно хотел использовать ранг над окном, чтобы легко выбрать первое или последнее вхождение определенного UniqueId, однако это создало смещение, сохраняя большую часть моих наблюдений в предыдущие месяцы или в более поздние месяцы.

Мне нужно что-то, что применило бы процесс случайного выбора в течение всех месяцев.

Пример смещения : Если мы попытаемся применить ранг или Плотный переход по окну, чтобы перейти к уникальному идентификатору, образец будет каким-то образом смещен. В приведенном ниже примере этого, если мы используем density_rank (), мы получим образец, который смещен в сторону первого вхождения и игнорирует последующие даты, когда может существовать этот UniqueId. Таким образом, вы в конечном итоге создаете смещение еще до применения равномерной выборки, когда выборка уже насыщена уникальными идентификаторами, найденными в более ранние даты:

df_sampled = df \
  .withColumn('Rank', dense_rank().over(w)) \
  .where(col('Rank')==1)) \

Решение:

w = Window.partitionBy('UniqueId')
w2 = Window.partitionBy(trunc("Dates", "month")).orderBy("RandomNum")
n = 10000
seed = 32

# Create a Rank Column using randomly generated numbers
# Use window function to create a column containing the max rank by unique id
# filter rows where the Random Rank == Max Rank to get distinct unique ids
# Next Uniformly sample the df by creating a new RandomNum column over the filterd df
# Create a row number column using the second window that is ordered by the RandomNum
# filter rows where the RowNum columns is <= the size of your uniform sample (n)

df_sampled = df \
  .withColumn('Rank', rand(seed)) \
  .withColumn('MaxRank', max(col('Rank')).over(w)) \
  .where(col('Rank')==col('MaxRank')) \
  .withColumn("RandomNum", rand(seed)) \
  .withColumn("RowNum", row_number().over(w2)) \
  .where(col("RowNum") <= n) \
  .cache()

Сводка : конечный результат дает единую выборку наблюдений, в которой уникальный идентификатор в выборке выбирался случайным образом в диапазоне дат во фрейме данных

1 Ответ

0 голосов
/ 07 мая 2020

ответ выше, не забудьте использовать семя с rand () для воспроизводимых результатов

...