Spark Dataframe - выборочные записи случайным образом для каждого дня - PullRequest
3 голосов
/ 02 июля 2019

У меня есть некоторые данные в Hive, где у меня ежедневно около 500k уникальных идентификаторов клиентов.Данные охватывают 2 месяца и делятся на date.Вот как выглядит распределение -

+----------+---------------+
|      date|unique_visitors|
+----------+---------------+
|2019-01-01|        500,000|
|2019-01-02|        500,000|
|2019-01-03|        500,000|
|2019-01-04|        500,000|
|2019-01-05|        500,000|
|2019-01-06|        500,000|
|..........|         ..... |
|2019-02-27|        500,000|
|2019-02-28|        500,000|
+----------+---------------+

Я хотел бы реализовать функцию, которая принимает N в качестве входных данных и дает мне столько записей в день в выходной таблице.

Например, если я задаю N как 250k, тогда я хочу 250k случайных выборок уникальных customerIds для каждого дня для всех 60-дневных данных, чтобы я мог поддерживать согласованность размера аудитории для каждого дня вмоя выходная таблица.

Таким образом, общее количество записей в выходной таблице будет 250k * 60.Вот как будет выглядеть распределение моей выходной таблицы -

+----------+---------------+
|      date|unique_visitors|
+----------+---------------+
|2019-01-01|        250,000|
|2019-01-02|        250,000|
|2019-01-03|        250,000|
|2019-01-04|        250,000|
|2019-01-05|        250,000|
|2019-01-06|        250,000|
|..........|         ..... |
|2019-02-27|        250,000|
|2019-02-28|        250,000|
+----------+---------------+

Как этого добиться с помощью Spark?

1 Ответ

0 голосов
/ 02 июля 2019

Я бы просто использовал оконную функцию partitionBy , чтобы разделить по дате и упорядочить по случайным значениям. Мы добавляем столбец 'rank', используя эту оконную функцию, затем фильтруем по рангу меньше вашего значения 'n' и отбрасываем столбец 'rank'.

import org.apahce.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val n = 250000
val w = Window.partitionBy($"date").orderBy(rand())
val res = df.withColumn("rank", rank().over(w)).filter($"rank" <= n).drop("rank")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...