Построение производного столбца с использованием преобразований Spark - PullRequest
1 голос
/ 23 мая 2019

Я получил запись в таблице, как указано ниже.

Id   Indicator     Date
1       R       2018-01-20
1       R       2018-10-21
1       P       2019-01-22
2       R       2018-02-28
2       P       2018-05-22
2       P       2019-03-05 

Мне нужно выбрать Id с более чем двумя R индикаторами за последний год и получить новый столбец под названиемMarked_Flag как Y в противном случае N.Таким образом, ожидаемый результат должен выглядеть следующим образом:

Id  Marked_Flag 
1   Y
2   N

Итак, что я сделал до сих пор, я взял записи в наборе данных, а затем снова построил другой набор данных из этого.Код выглядит следующим образом.

Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");

Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");

Но я могу привести то, что нужно сделать, используя один набор данных и используя преобразования Spark.Я довольно новичок в Spark, любые советы или фрагменты кода на этот счет были бы очень полезны.

Создал два набора данных, один для получения агрегации, а другой использовал агрегированное значение для получения нового столбца.

Dataset<row> getIndicators = spark.sql("select id, count(indicator) as indi_count from source group by id having indicator = 'R'");

Dataset<row>getFlag = spark.sql("select id, case when indi_count > 1 then 'Y' else 'N' end as Marked_Flag" from getIndicators");

Ввод

Ожидаемый вывод

1 Ответ

1 голос
/ 23 мая 2019

Попробуйте следующее.Обратите внимание, что я использую pyspark DataFrame здесь

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
[1, "R", "2018-01-20"],
[1, "R", "2018-10-21"],
[1, "P", "2019-01-22"],
[2, "R", "2018-02-28"],
[2, "P", "2018-05-22"],
[2, "P", "2019-03-05"]], ["Id", "Indicator","Date"])

gr = df.filter(F.col("Indicator")=="R").groupBy("Id").agg(F.count("Indicator"))
gr = gr.withColumn("Marked_Flag", F.when(F.col("count(Indicator)") > 1, "Y").otherwise('N')).drop("count(Indicator)")
gr.show()

# +---+-----------+
# | Id|Marked_Flag|
# +---+-----------+
# |  1|          Y|
# |  2|          N|
# +---+-----------+
# 
...