Как использовать функцию окна Pysark для новых необработанных данных? - PullRequest
0 голосов
/ 23 октября 2019

Я разработал оконные функции в DataFrame pyspark для расчета общей суммы транзакции, сделанной клиентом ежемесячно за транзакцию.

Например:

В таблице ввода есть данные:

Input Table

И функция окна обрабатывает данные и вставляетэто в таблицу

Processed Table

Теперь, если я получаю новые транзакции сегодня, я хочу разработать код, в котором он загружает транзакцию последнего месяца в фрейм данных spark изапуск оконной функции на новых строках и сохранение ее в обработанной таблице. Текущая оконная функция будет обрабатывать все строки, а затем необходимо вручную избегать уже вставленных записей и вставлять только новые записи. Это будет использовать большие ресурсы и большой объем памяти, когда оконная функция становится на год.

#Function to apply window function
def cumulative_total_CR(df, from_column, to_column, window_function):
    intermediate_column = from_column + "_temp"
    df = df.withColumn(from_column,df[from_column].cast("double"))

    df = df.withColumn(intermediate_column,when(col("Flow") == 'C',df[from_column]).otherwise(0))

    df = df.withColumn(to_column, F.sum(intermediate_column).over(window_function))

    return df

def cumulative_total_DR(df, from_column, to_column, window_function):
    intermediate_column = from_column + "_temp"
    df = df.withColumn(from_column,df[from_column].cast("double"))

    df = df.withColumn(intermediate_column,when(col("Flow") == 'D',df[from_column]).otherwise(0))

    df = df.withColumn(to_column, F.sum(intermediate_column).over(window_function))

    return df

#Window Function:

window = (Window.partitionBy("CUSNO").orderBy(F.col(TxnDateTime).cast('long')).rangeBetween(-30,0))

df = load.data.from.hive
#appending TxnDate and TxnTime into new column TxnDateTime with type casting as timestamp and format as 'yyyy-MM-dd HH:mm:ss.SSS'
df = cumulative_total_CR(df, "TXNAMT", "Total_Cr_Monthly_Amt", window_function_30_days)
df = cumulative_total_DR(df, "TXNAMT", "Total_Dr_Monthly_Amt", window_function_30_days)                               

df = saving.data.to.disk for new records
...