Ввод пропущенных значений в строках с использованием даты, чтобы обеспечить использование самых последних полных данных в PySpark - PullRequest
0 голосов
/ 02 марта 2020

Я в процессе преобразования какого-то старого SQL кода в PySpark и пытаюсь рационализировать этот скрипт в своей голове:

DROP TABLE IF EXISTS features.weather;
CREATE TABLE IF NOT EXISTS features.weather as (
SELECT
    gen_date_weather,
    first_value(avg_temperature) over (partition by grp_avg_temperature) as avg_temperature,
    first_value(min_temperature) over (partition by grp_min_temperature) as min_temperature,
    first_value(max_temperature) over (partition by grp_max_temperature) as max_temperature,
    first_value(avg_windspeed) over (partition by grp_avg_windspeed) as avg_windspeed,
    first_value(max_windgust) over (partition by grp_max_windgust) as max_windgust,
    first_value(max_precipprobability) over (partition by grp_max_precipprobability) as max_precipprobability,
    first_value(min_precipprobability) over (partition by grp_min_precipprobability) as min_precipprobability,
    case when snowaccumulation_flag > 0 then 1 else 0 end as snowaccumulation_flag
FROM
(
    SELECT
        CAST(gen_date_weather as DATE) as gen_date_weather,
        max_windgust,
        max_precipprobability,
        min_precipprobability,
        avg_temperature,
        min_temperature,
        max_temperature,
        avg_windspeed,
        snowaccumulation_flag,
        sum(case when avg_temperature is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_avg_temperature,
        sum(case when min_temperature is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_min_temperature,
        sum(case when max_temperature is not null then 1 end) over (order by CAST(gen_date_weather as DATE)) as grp_max_temperature,
        sum(case when avg_windspeed is not null then 1 end) over (order by CAST(gen_date_weather as DATE))  as grp_avg_windspeed,
        sum(case when max_windgust is not null then 1 end) over (order by CAST(gen_date_weather as DATE))  as grp_max_windgust,
        sum(case when max_precipprobability is not null then 1 end) over (order by CAST(gen_date_weather as DATE))  as grp_max_precipprobability,
        sum(case when min_precipprobability is not null then 1 end) over (order by CAST(gen_date_weather as DATE))  as grp_min_precipprobability
        from semantic.weather) t
);

В настоящее время я буквально преобразовал его в PySpark, но это странно разделить по отдельности на любой порядок или диапазон, есть ли более чистый способ достижения того же эффекта. Цель состоит в том, чтобы оглянуться назад на любые значения NULL в данных и заполнить их из последней заполненной строки.

def ordered_count_non_nulls(count_c: str, order_c: str) -> pyspark.sql.Column:
    return f.sum(f.when(f.col(count_c).isNotNull(), 1).otherwise(0)).over(
        w.orderBy(f.col(order_c))
    )


def fill_gaps(c: str) -> pyspark.sql.Column:
    return f.first(f.col(c), True).over(w.partitionBy("grp_" + c)).alias(c)


def create_features_weather(
    semantic_weather: pyspark.sql.DataFrame,
) -> pyspark.sql.DataFrame:
    """
    Impute missing weather data from most recent complete date
    """
    grouped_counts_weather = semantic_weather.select(
        f.col("gen_date_weather"),
        f.col("max_wind_gust"),
        f.col("max_precip_probability"),
        f.col("min_precip_probability"),
        f.col("avg_temperature"),
        f.col("min_temperature"),
        f.col("max_temperature"),
        f.col("avg_wind_speed"),
        f.col("snow_accumulation_flag"),
        ordered_count_non_nulls("avg_temperature", "gen_date_weather").alias(
            "grp_avg_temperature"
        ),
        ordered_count_non_nulls("min_temperature", "gen_date_weather").alias(
            "grp_min_temperature"
        ),
        ordered_count_non_nulls("max_temperature", "gen_date_weather").alias(
            "grp_max_temperature"
        ),
        ordered_count_non_nulls("avg_wind_speed", "gen_date_weather").alias(
            "grp_avg_wind_speed"
        ),
        ordered_count_non_nulls("max_wind_gust", "gen_date_weather").alias(
            "grp_max_wind_gust"
        ),
        ordered_count_non_nulls("max_precip_probability", "gen_date_weather").alias(
            "grp_max_precip_probability"
        ),
        ordered_count_non_nulls("min_precip_probability", "gen_date_weather").alias(
            "grp_min_precip_probability"
        ),
    )

    features_weather = grouped_counts_weather.select(
        f.col("gen_date_weather"),
        fill_gaps("avg_temperature"),
        fill_gaps("min_temperature"),
        fill_gaps("max_temperature"),
        fill_gaps("avg_wind_speed"),
        fill_gaps("max_wind_gust"),
        fill_gaps("max_precip_probability"),
        fill_gaps("min_precip_probability"),
        f.when(f.col("snow_accumulation_flag") > 0, 1)
        .otherwise(0)
        .alias("snow_accumulation_flag"),
    )

    return features_weather
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...