мой набор данных
data=[['Joe','Waterloo','9/21/19 3:04 AM','9/21/19 3:18 AM'],['Stacy','Kirkwood','8/4/19 3:06 PM','8/4/19 3:54 PM'],['John','Waterloo','9/21/19 8:48 AM','9/21/19 9:05 AM'],
['Stacy','Kirkwood','9/21/19 4:06 PM', '9/21/19 4:54 PM'],
['Mo','Hashmi','9/21/19 1:06 PM', '9/21/19 5:54 PM'],
['Murti','Hash','9/21/19 1:00 PM', '9/21/19 3:00 PM'],
['Floo','Shmi','9/21/19 9:10 PM', '9/21/19 11:54 PM']]
cSchema = StructType([StructField("User", StringType())\
,StructField("Site", StringType())
, StructField("Sesh-Start", StringType())
, StructField("Sesh-End", StringType())])
df= spark.createDataFrame(data,schema=cSchema)
display(df)
разметка времени разбора
df1=df.withColumn("Start", F.from_unixtime(F.unix_timestamp("Sesh-Start",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("End", F.from_unixtime(F.unix_timestamp("Sesh-End",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).drop("Sesh-Start","Sesh-End")
сборка и регистрация udf для нескольких часов на человека
def yo(a,b):
from datetime import datetime
d1 = datetime.strptime(str(a), '%Y-%m-%d %H:%M:%S')
d2 = datetime.strptime(str(b), '%Y-%m-%d %H:%M:%S')
y=[]
if d1.hour == d2.hour:
y.append(d1.hour)
else:
for i in range(d1.hour,d2.hour+1):
y.append(i)
return y
rng= udf(yo, ArrayType(IntegerType()))
разбить список часов на столбец
df2=df1.withColumn("new", rng(F.col("Start"),F.col("End"))).withColumn("new1",F.explode("new")).drop("new")
получить секунды за каждый час
df3=df2.withColumn("Seconds", when(F.hour("Start")==F.hour("End"), F.col("End").cast('long') - F.col("Start").cast('long'))
.when(F.hour("Start")==F.col("new1"), 3600-F.minute("Start")*60)
.when(F.hour("End")==F.col("new1"), F.minute("End")*60)
.otherwise(3600))
создать временное представление и запросить его
df3.createOrReplaceTempView("final")
display(spark.sql("Select new1, sum(Seconds) from final group by new1 order by new1"))
Приведенный выше ответ Леннарта может быть более убедительным, поскольку он использует объединение, чтобы получить все разные часы, вместо этого я использую UDF, который может быть медленнее. Мой код будет работать для любого пользователя, который может быть в сети в течение любого количества часов. В моих данных использовался только необходимый день, поэтому вы можете использовать приведенный выше фильтр дня, чтобы ограничить свой запрос указанным днем. Окончательный результат