Ранжирование рядов PySpark, взвешенных по соотношению целей - PullRequest
1 голос
/ 25 июня 2019

Я хочу повторно набрать набранный набор студентов, чтобы гарантировать, что я взял N студентов из верхней части моего списка я бы получил хотя бы часть в определенной категории.

Так что, если бы у нас был ввод этого кадра данных

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         A|female|100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         E|female| 66.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
|         I|  male| 22.0|
|         J|  male| 11.0|
+----------+------+-----+

И наша цель состояла в том, чтобы в любом случае стать частью моего населения .2 мужчин, я бы сказал это так:

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         I|  male| 22.0|
|         A|female|100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         J|  male| 11.0|
|         E|female| 66.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
+----------+------+-----+

Теперь, если я возьму своих лучших 1, 2, 3, 4, 5 ... 10 учеников из моего населения, я гарантирую, что бью своих .2 соотношение мужчин, но по-прежнему принимает мужчин в порядке от лучшего к худшему.

И хотя мои самки немного поправились, я все равно следю за тем, чтобы их принимали в порядке от лучшего к худшему.

Вот еще несколько примеров.


Ввод

+----------+------+-----+          
|STUDENT_ID|  TYPE|SCORE|          
+----------+------+-----+          
|         A|female|100.0|          
|         B|female| 99.0|          
|         C|female| 88.0|          
|         D|female| 77.0|          
|         E|female| 66.0|          
|         F|female| 55.0|          
|         G|female| 44.0|          
|         H|female| 33.0|          
|         I|  male| 22.0|          
|         J|  male| 11.0|          
+----------+------+-----+         

Выход со 100% должен быть мужским, поэтому все они перемещаются наверх

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         I|  male| 22.0|
|         J|  male| 11.0|
|         A|female|100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         E|female| 66.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
+----------+------+-----+

Input

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         A|male  |100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         E|female| 66.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
|         I|  male| 22.0|
|         J|  male| 11.0|
+----------+------+-----+

Выход с 20% должен быть мужским, но один был уже на месте, поэтому нам нужно только двигаться 1

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         A|male  |100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         E|female| 66.0|
|         I|  male| 22.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
|         J|  male| 11.0|
+----------+------+-----+

Вот код, который работает для некоторых случаев, но не для других.

Он принимает входной фрейм данных, ранжирует его, ранжирует по типу, а затем корректирует ранжирование на основе желаемого соотношения.

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
import pyspark.sql.functions as f

temp_struct = StructType([
    StructField('STUDENT_ID',  StringType()),
    StructField('TYPE',  StringType()),
    StructField('SCORE',  DoubleType())
])


temp_df = spark.createDataFrame([
    ['A',  'female', 100.0],
    ['B',  'female', 99.0],
    ['C',  'female', 88.0],
    ['D',  'female', 77.0],
    ['E',  'female', 66.0],
    ['F',  'female', 55.0],
    ['G',  'female', 44.0],
    ['H',  'female', 33.0],
    ['I',  'male', 22.0],
    ['J',  'male', 11.0]
], temp_struct)

print('Initial DF')
temp_df.show()

window_by_score_desc = Window.orderBy(f.col('SCORE').desc())
temp_df = temp_df.withColumn('RANK', f.row_number().over(window_by_score_desc)).orderBy(f.col('RANK').asc())
print('With RANK DF')
temp_df.show()

window_by_type_rank = Window.partitionBy(f.col('TYPE')).orderBy(f.col('RANK').asc())
temp_df = temp_df.withColumn('TYPE_RANK', f.row_number().over(window_by_type_rank)).orderBy(f.col('RANK').asc())
print('With TYPE RANK DF')
temp_df.show()

def weight_for_type_and_ratio(input_df, student_type, student_ratio):
    section_size = float(1 / student_ratio)
    return input_df.withColumn('ADJUSTED_RANK', 
                               f.when(f.col('TYPE') == student_type, 
                                       (f.col('TYPE_RANK') - 1) * (section_size-1) + .5).otherwise(f.col('RANK')))


print('FINAL WITH ADJUSTED RANK DF')
weight_for_type_and_ratio(temp_df, 'male', .2).orderBy(f.col('ADJUSTED_RANK').asc()).show()

И этот код работает в некоторых случаях .... Входной сигнал:

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         A|female|100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         E|female| 66.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
|         I|  male| 22.0|
|         J|  male| 11.0|
+----------+------+-----+

Что дает правильно скорректированный ранжированный вывод

+----------+------+-----+----+---------+-------------+
|STUDENT_ID|  TYPE|SCORE|RANK|TYPE_RANK|ADJUSTED_RANK|
+----------+------+-----+----+---------+-------------+
|         I|  male| 22.0|   9|        1|          0.5|
|         A|female|100.0|   1|        1|          1.0|
|         B|female| 99.0|   2|        2|          2.0|
|         C|female| 88.0|   3|        3|          3.0|
|         D|female| 77.0|   4|        4|          4.0|
|         J|  male| 11.0|  10|        2|          4.5|
|         E|female| 66.0|   5|        5|          5.0|
|         F|female| 55.0|   6|        6|          6.0|
|         G|female| 44.0|   7|        7|          7.0|
|         H|female| 33.0|   8|        8|          8.0|
+----------+------+-----+----+---------+-------------+

Но не для других случаев, в частности, когда некоторые записи уже имеются и не нуждаются в корректировке.

Вход DF: Начальный ДФ

+----------+------+-----+
|STUDENT_ID|  TYPE|SCORE|
+----------+------+-----+
|         A|  male|100.0|
|         B|female| 99.0|
|         C|female| 88.0|
|         D|female| 77.0|
|         E|female| 66.0|
|         F|female| 55.0|
|         G|female| 44.0|
|         H|female| 33.0|
|         I|  male| 22.0|
|         J|  male| 11.0|
+----------+------+-----+

Что дает неправильный вывод:

+----------+------+-----+----+---------+-------------+
|STUDENT_ID|  TYPE|SCORE|RANK|TYPE_RANK|ADJUSTED_RANK|
+----------+------+-----+----+---------+-------------+
|         A|  male|100.0|   1|        1|          0.5|
|         B|female| 99.0|   2|        1|          2.0|
|         C|female| 88.0|   3|        2|          3.0|
|         D|female| 77.0|   4|        3|          4.0|
|         I|  male| 22.0|   9|        2|          4.5|
|         E|female| 66.0|   5|        4|          5.0|
|         F|female| 55.0|   6|        5|          6.0|
|         G|female| 44.0|   7|        6|          7.0|
|         H|female| 33.0|   8|        7|          8.0|
|         J|  male| 11.0|  10|        3|          8.5|
+----------+------+-----+----+---------+-------------+

Там, где скорректированный ранг мужчины I слишком высок.

Есть мысли о другом подходе к этой проблеме. Не нужно много изменений в коде, возможно, просто другой мыслительный процесс.

1 Ответ

0 голосов
/ 26 июня 2019

В случае, если вы хотите убедиться, что когда вы берете N студентов, вы получаете определенную часть определенной категории, я думаю, что есть гораздо более четкое решение с использованием limit . Посмотрите на код ниже:

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
import pyspark.sql.functions as F

temp_struct = StructType([
    StructField('STUDENT_ID',  StringType()),
    StructField('TYPE',  StringType()),
    StructField('SCORE',  DoubleType())
])


temp_df = spark.createDataFrame([
    ['A',  'female', 100.0],
    ['B',  'female', 99.0],
    ['C',  'female', 88.0],
    ['D',  'female', 77.0],
    ['E',  'female', 66.0],
    ['F',  'female', 55.0],
    ['G',  'female', 44.0],
    ['H',  'female', 33.0],
    ['I',  'male', 22.0],
    ['J',  'male', 11.0]
], temp_struct)

#Total number of students you want to get 
total = 5
#portion of the category
fractionMale = 0.2

#simply selecting and limiting the rows for each category and using a union to get a single dataframe
temp_df.filter(temp_df.TYPE == 'male').limit(int(total * fractionMale)).union(temp_df.filter(temp_df.TYPE == 'female').limit(int(total * (1-fractionMale)))).show()

Выход:

+----------+------+-----+ 
|STUDENT_ID|  TYPE|SCORE| 
+----------+------+-----+ 
|         I|  male| 22.0| 
|         A|female|100.0| 
|         B|female| 99.0| 
|         C|female| 88.0| 
|         D|female| 77.0| 
+----------+------+-----+

К сожалению, мы не можем использовать sampleby , поскольку Spark использует Bernoulli_sampling , и, несомненно, вы получите ожидаемые итоги по каждой категории. То есть следующее не всегда вернет вам 5 строк с ожидаемыми дробями.

total = 5
fractionMale = 0.2
countMale = temp_df.filter(temp_df.TYPE == 'male').count()
countFemale = temp_df.count() - countMale

sampleFractionMale = (total * fractionMale)/countMale

sampleFractionFemale = (total * (1 - fractionMale))/countFemale

temp_df.sampleBy("TYPE", fractions={'male': sampleFractionMale, 'female': sampleFractionFemale}).show()

Выход:

+----------+------+-----+ 
|STUDENT_ID| TYPE|SCORE| 
+----------+------+-----+ 
|         A|female|100.0| 
|         B|female| 99.0| 
|         C|female| 88.0| 
|         D|female| 77.0| 
|         F|female| 55.0| 
|         I|  male| 22.0| 
+----------+------+-----+
...