Pyspark DataFrame объединение двух DataFrame - PullRequest
0 голосов
/ 22 ноября 2018

У меня есть два кадра данных, скажем, df1 и df2: df1 имеет поля как CI_NAME, CLOSE_TIME, CH_ID и df2 имеет поля как NAME, TIMESTAMP, MEM_CONSUMED.В основном df1 имеет записи обновлений программного обеспечения, выполненных в системе, а df2 имеет записи мониторинга системы.

Мне нужно добавить поле в df1 с именем cpu_util_avg_before_update, сравнив CI_NAME равнымна NAME поле df2 и CLOSE_TIME между TIMESTAMP - 7 дней и TIMESTAMP, а затем принять среднее значение MEM_CONSUMED.

Как я могу это сделать, любая помощь будетоценил, как я пробовал udf, но это не принимает dataframe в качестве ввода.Спасибо

вот код, который я попробовал:

from pyspark.sql.functions import col,udf,struct

from dateutil import parser

import datetime

@udf

def memavgbeforeupdate(structx,df2):
    df=df2.where(col("name")==structx[1] & (col("timestamp")>parser.parse(structx[0])-datetime.timedelta(days=10) & col("timestamp")<parser.parse(structx[0])+datetime.timedelta(days=10)))

    df=df.where(col("mem_consumed_average")!="NaN").where(col("mem_consumed_average").isNotNull())
    if df.rdd.isEmpty():
        return -1
    else:
        df1=df.select("mem_consumed_average")
        return float(str(df1.select(mean(col("mem_consumed_average"))).collect()[0]).split("=")[1].split(")")[0])



df3=df1.withColumn("mem_avg_before_update",memavgbeforeupdate(struct(col("CLOSE_TIME"),col("CI_NAME")),df2))

Но это не работает и выдает ошибку как:

«Объект DataFrame» не имеет атрибута »_get_object_id '

...