Я в процессе преобразования какого-то старого SQL кода в PySpark и пытаюсь рационализировать этот скрипт в своей голове:
DROP TABLE IF EXISTS features.weather;
CREATE TABLE IF NOT EXISTS features.weather as (
SELECT
gen_date_weather,
first_value(avg_temperature) over (partition by grp_avg_temperature) as avg_temperature,
first_value(min_temperature) over (partition by grp_min_temperature) as min_temperature,
first_value(max_temperature) over (partition by grp_max_temperature) as max_temperature,
first_value(avg_windspeed) over (partition by grp_avg_windspeed) as avg_windspeed,
first_value(max_windgust) over (partition by grp_max_windgust) as max_windgust,
first_value(max_precipprobability) over (partition by grp_max_precipprobability) as max_precipprobability,
first_value(min_precipprobability) over (partition by grp_min_precipprobability) as min_precipprobability,
case when snowaccumulation_flag > 0 then 1 else 0 end as snowaccumulation_flag
FROM
(
SELECT
CAST(gen_date_weather as DATE) as gen_date_weather,
max_windgust,
max_precipprobability,
min_precipprobability,
avg_temperature,
min_temperature,
max_temperature,
avg_windspeed,
snowaccumulation_flag,
sum(case when avg_temperature is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_avg_temperature,
sum(case when min_temperature is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_min_temperature,
sum(case when max_temperature is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_max_temperature,
sum(case when avg_windspeed is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_avg_windspeed,
sum(case when max_windgust is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_max_windgust,
sum(case when max_precipprobability is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_max_precipprobability,
sum(case when min_precipprobability is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_min_precipprobability
from semantic.weather) t
);
В настоящее время я буквально преобразовал его в PySpark, но это странно разделить по отдельности на любой порядок или диапазон, есть ли более чистый способ достижения того же эффекта. Цель состоит в том, чтобы оглянуться назад на любые значения NULL в данных и заполнить их из последней заполненной строки.
def ordered_count_non_nulls(count_c: str, order_c: str) -> pyspark.sql.Column:
return f.sum(f.when(f.col(count_c).isNotNull(), 1).otherwise(0)).over(
w.orderBy(f.col(order_c))
)
def fill_gaps(c: str) -> pyspark.sql.Column:
return f.first(f.col(c), True).over(w.partitionBy("grp_" + c)).alias(c)
def create_features_weather(
semantic_weather: pyspark.sql.DataFrame,
) -> pyspark.sql.DataFrame:
"""
Impute missing weather data from most recent complete date
"""
grouped_counts_weather = semantic_weather.select(
f.col("gen_date_weather"),
f.col("max_wind_gust"),
f.col("max_precip_probability"),
f.col("min_precip_probability"),
f.col("avg_temperature"),
f.col("min_temperature"),
f.col("max_temperature"),
f.col("avg_wind_speed"),
f.col("snow_accumulation_flag"),
ordered_count_non_nulls("avg_temperature", "gen_date_weather").alias(
"grp_avg_temperature"
),
ordered_count_non_nulls("min_temperature", "gen_date_weather").alias(
"grp_min_temperature"
),
ordered_count_non_nulls("max_temperature", "gen_date_weather").alias(
"grp_max_temperature"
),
ordered_count_non_nulls("avg_wind_speed", "gen_date_weather").alias(
"grp_avg_wind_speed"
),
ordered_count_non_nulls("max_wind_gust", "gen_date_weather").alias(
"grp_max_wind_gust"
),
ordered_count_non_nulls("max_precip_probability", "gen_date_weather").alias(
"grp_max_precip_probability"
),
ordered_count_non_nulls("min_precip_probability", "gen_date_weather").alias(
"grp_min_precip_probability"
),
)
features_weather = grouped_counts_weather.select(
f.col("gen_date_weather"),
fill_gaps("avg_temperature"),
fill_gaps("min_temperature"),
fill_gaps("max_temperature"),
fill_gaps("avg_wind_speed"),
fill_gaps("max_wind_gust"),
fill_gaps("max_precip_probability"),
fill_gaps("min_precip_probability"),
f.when(f.col("snow_accumulation_flag") > 0, 1)
.otherwise(0)
.alias("snow_accumulation_flag"),
)
return features_weather