проверить, находится ли значение одного столбца между диапазоном другого столбца (массива) в кадре данных - PullRequest
0 голосов
/ 27 мая 2019

У меня есть фрейм данных, в котором мне нужно сравнить несколько значений и вывести некоторые из них.

Например,

my DF

CITY DAY MONTH TAG RANGE     VALUE  RANK
A    1    01    A   [50, 90]   55     1
A    2    02    B   [30, 40]   34     3
A    1    03    A   [05, 10]   15    20
A    1    04    B   [50, 60]   11    10 
A    1    05    B   [50, 60]   54    4 

Я должен проверить для каждой строки, находится ли значение "VALUE" между "RANGE".Здесь arr [0] - нижний предел, а arr [1] - верхний предел.

Мне нужно создать новый DF, такой, что

NEW-DF

TAG  Positive  Negative
A     1          1
B     2          1 
  1. Если «значение» находится между заданным диапазономи ранг <5, тогда я бы добавил его к «положительному» </p>

  2. Если значение не лежит в данном диапазоне, то оно отрицательное

  3. Если значение находится в заданном диапазоне, но ранг> 5, то я бы посчитал его отрицательным

«Положительный» и «Отрицательный» - это ничто иное, как числозначения, удовлетворяющие любому из условий.

Ответы [ 2 ]

1 голос
/ 27 мая 2019

Мы можем использовать element_at, чтобы получить элементы в каждой позиции и сравнить их с соответствующим значением в каждой строке вместе с условием ранга, а затем выполнить groupby с sum для тега:

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

range_df = df.withColumn('in_range', (F.element_at('range', 1).cast(IntegerType()) < F.col('value')) & 
                                     (F.col('value') < F.element_at('range', 2).cast(IntegerType())) &
                                     (F.col('rank') < 5))

range_df.show()

grouped_df = range_df.groupby('tag').agg(F.sum(F.col('in_range').cast(IntegerType())).alias('total_positive'), 
                                         F.sum((~F.col('in_range')).cast(IntegerType())).alias('total_negative'))

grouped_df.show()

Вывод:

+---+--------+-----+----+--------+
|tag|   range|value|rank|in_range|
+---+--------+-----+----+--------+
|  A|[50, 90]|   55|   1|    true|
|  B|[30, 40]|   34|   3|    true|
|  A|[05, 10]|   15|  20|   false|
|  B|[50, 60]|   11|  10|   false|
|  B|[50, 60]|   54|   4|    true|
+---+--------+-----+----+--------+

+---+--------------+--------------+
|tag|total_positive|total_negative|
+---+--------------+--------------+
|  B|             2|             1|
|  A|             1|             1|
+---+--------------+--------------+
0 голосов
/ 27 мая 2019

Сначала вы должны использовать UDF для обработки диапазона:

val df = Seq(("A","1","01","A","[50,90]","55","1")).toDF("city","day","month","tag","range","value","rank")

+----+---+-----+---+-------+-----+----+
|city|day|month|tag|  range|value|rank|
+----+---+-----+---+-------+-----+----+
|   A|  1|   01|  A|[50,90]|   55|   1|
+----+---+-----+---+-------+-----+----+


  def checkRange(range : String,rank : String, value : String) : String = {
    val rangeProcess = range.dropRight(1).drop(1).split(",")
    if (rank.toInt > 5){
      "negative"
    } else {
      if (value > rangeProcess(0) && value < rangeProcess(1)){
        "positive"
      } else {
        "negative"
      }
    }
  }

  val checkRangeUdf = udf(checkRange _)

df.withColumn("Result",checkRangeUdf(col("range"),col("rank"),col("value"))).show()

+----+---+-----+---+-------+-----+----+--------+
|city|day|month|tag|  range|value|rank|  Result|
+----+---+-----+---+-------+-----+----+--------+
|   A|  1|   01|  A|[50,90]|   55|   1|positive|
+----+---+-----+---+-------+-----+----+--------+


val result = df.withColumn("Result",checkRangeUdf(col("range"),col("rank"),col("value"))).groupBy("city","Result").count.show

+----+--------+-----+
|city|  Result|count|
+----+--------+-----+
|   A|positive|    1|
+----+--------+-----+
...