Pyspark - получение значений из массива с диапазоном минимальных и максимальных значений - PullRequest
0 голосов
/ 15 мая 2019

Я пытаюсь написать запрос в PySpark, который получит правильное значение из массива.

Например, у меня есть dataframe с именем df с тремя столбцами, «companyId», «companySize» и «weightingRange.Столбец «companySize» - это количество сотрудников.Столбец weightingRange представляет собой массив со следующим

[ {"minimum":0, "maximum":100, "weight":123},
  {"minimum":101, "maximum":200, "weight":456},
  {"minimum":201, "maximum":500, "weight":789}
]

, поэтому фрейм данных выглядит следующим образом (weightingRange такой же, как и выше, его усеченный в приведенном ниже примере для более четкого форматирования)

+-----------+-------------+------------------------+--+
| companyId | companySize |     weightingRange     |  |
+-----------+-------------+------------------------+--+
| ABC1      |         150 | [{"maximum":100, etc}] |  |
| ABC2      |          50 | [{"maximum":100, etc}] |  |
+-----------+-------------+------------------------+--+

Таким образом, для записи о размере компании = 150 мне нужно вернуть вес 456 в столбец с названием «companyWeighting»

Таким образом, он должен показать следующее

+-----------+-------------+------------------------+------------------+
| companyId | companySize |     weightingRange     | companyWeighting |
+-----------+-------------+------------------------+------------------+
| ABC1      |         150 | [{"maximum":100, etc}] |              456 |
| ABC2      |          50 | [{"maximum":100, etc}] |              123 |
+-----------+-------------+------------------------+------------------+

взглянул на

df.withColumn("tmp",explode(col("weightingRange"))).select("tmp.*")

, а затем присоединился, но пытался применить, что бы декартово данные.

Предложения приветствуются!

1 Ответ

1 голос
/ 16 мая 2019

Вы можете подойти так,

Первое создание образца кадра данных,

import pyspark.sql.functions as F

df = spark.createDataFrame([
        ('ABC1', 150, [ {"min":0, "max":100, "weight":123},
                        {"min":101, "max":200, "weight":456},
                        {"min":201, "max":500, "weight":789}]),
        ('ABC2', 50, [  {"min":0, "max":100, "weight":123},
                        {"min":101, "max":200, "weight":456},
                        {"min":201, "max":500, "weight":789}])],  

        ['companyId' , 'companySize', 'weightingRange'])

Затем, создав функцию udf и применив ее к каждой строке, чтобы получить новый столбец,

def get_weight(wt,wt_rnge):
    for _d in wt_rnge:
        if _d['min'] <= wt <= _d['max']:
            return _d['weight']

get_weight_udf = F.udf(lambda x,y: get_weight(x,y))
df = df.withColumn('companyWeighting', get_weight_udf(F.col('companySize'), F.col('weightingRange')))
df.show()

Вы получите вывод как,

+---------+-----------+--------------------+----------------+
|companyId|companySize|      weightingRange|companyWeighting|
+---------+-----------+--------------------+----------------+
|     ABC1|        150|[Map(weight -> 12...|             456|
|     ABC2|         50|[Map(weight -> 12...|             123|
+---------+-----------+--------------------+----------------+
...