У меня есть два кадра данных, скажем, df1
и df2
: df1
имеет поля как CI_NAME
, CLOSE_TIME
, CH_ID
и df2
имеет поля как NAME
, TIMESTAMP
, MEM_CONSUMED
.В основном df1
имеет записи обновлений программного обеспечения, выполненных в системе, а df2
имеет записи мониторинга системы.
Мне нужно добавить поле в df1
с именем cpu_util_avg_before_update
, сравнив CI_NAME
равнымна NAME
поле df2
и CLOSE_TIME
между TIMESTAMP
- 7 дней и TIMESTAMP
, а затем принять среднее значение MEM_CONSUMED
.
Как я могу это сделать, любая помощь будетоценил, как я пробовал udf
, но это не принимает dataframe в качестве ввода.Спасибо
вот код, который я попробовал:
from pyspark.sql.functions import col,udf,struct
from dateutil import parser
import datetime
@udf
def memavgbeforeupdate(structx,df2):
df=df2.where(col("name")==structx[1] & (col("timestamp")>parser.parse(structx[0])-datetime.timedelta(days=10) & col("timestamp")<parser.parse(structx[0])+datetime.timedelta(days=10)))
df=df.where(col("mem_consumed_average")!="NaN").where(col("mem_consumed_average").isNotNull())
if df.rdd.isEmpty():
return -1
else:
df1=df.select("mem_consumed_average")
return float(str(df1.select(mean(col("mem_consumed_average"))).collect()[0]).split("=")[1].split(")")[0])
df3=df1.withColumn("mem_avg_before_update",memavgbeforeupdate(struct(col("CLOSE_TIME"),col("CI_NAME")),df2))
Но это не работает и выдает ошибку как:
«Объект DataFrame» не имеет атрибута »_get_object_id '