pyspark daterange расчеты в искре - PullRequest
0 голосов
/ 27 января 2020

Я пытаюсь обработать данные сеанса входа на сайт каждого пользователя. Я читаю файл журнала сеанса S3 в СДР. Данные выглядят примерно так.

----------------------------------------
User | Site   | Session start   | Session end
---------------------------------------
Joe  |Waterloo| 9/21/19 3:04 AM |9/21/19 3:18 AM

Stacy|Kirkwood| 8/4/19 3:06 PM  |8/4/19 3:54 PM

John |Waterloo| 9/21/19 8:48 AM |9/21/19 9:05 AM

Stacy|Kirkwood| 8/4/19 4:16 PM  |8/4/19 5:41 PM
...
...

Я хочу узнать, сколько пользователей было зарегистрировано в каждую секунду часа в данный день.

Пример: я могу обрабатывать эти данные только для 9/21/19. Итак, мне нужно будет удалить все другие записи, а затем сеансы пользователей SUM для каждой секунды часа в течение всех 24 часов 21.09.19. Выходные данные должны быть, возможно, 24 строками для всех часов 21.09.19, а затем считать для каждой секунды дня (yikes, данные секунда за секундой!).

Можно ли это сделать в pyspark, используя rdds или DF? (Извиняюсь за опоздание в построении сетки). Спасибо

Ответы [ 2 ]

0 голосов
/ 03 февраля 2020

мой набор данных

data=[['Joe','Waterloo','9/21/19 3:04 AM','9/21/19 3:18 AM'],['Stacy','Kirkwood','8/4/19 3:06 PM','8/4/19 3:54 PM'],['John','Waterloo','9/21/19 8:48 AM','9/21/19 9:05 AM'],
          ['Stacy','Kirkwood','9/21/19 4:06 PM', '9/21/19 4:54 PM'],
         ['Mo','Hashmi','9/21/19 1:06 PM', '9/21/19 5:54 PM'],
         ['Murti','Hash','9/21/19 1:00 PM', '9/21/19 3:00 PM'],
         ['Floo','Shmi','9/21/19 9:10 PM', '9/21/19 11:54 PM']]
    cSchema = StructType([StructField("User", StringType())\
                          ,StructField("Site", StringType())
                          , StructField("Sesh-Start", StringType())
                          , StructField("Sesh-End", StringType())])
    df= spark.createDataFrame(data,schema=cSchema)
    display(df)

разметка времени разбора

df1=df.withColumn("Start", F.from_unixtime(F.unix_timestamp("Sesh-Start",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("End", F.from_unixtime(F.unix_timestamp("Sesh-End",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).drop("Sesh-Start","Sesh-End")

сборка и регистрация udf для нескольких часов на человека

def yo(a,b):

  from datetime import datetime
  d1 = datetime.strptime(str(a), '%Y-%m-%d %H:%M:%S')
  d2 = datetime.strptime(str(b), '%Y-%m-%d %H:%M:%S')
  y=[]
  if d1.hour == d2.hour:
     y.append(d1.hour)
  else:
     for i in range(d1.hour,d2.hour+1):
        y.append(i)

  return y

rng= udf(yo, ArrayType(IntegerType()))

разбить список часов на столбец

df2=df1.withColumn("new", rng(F.col("Start"),F.col("End"))).withColumn("new1",F.explode("new")).drop("new")

получить секунды за каждый час

df3=df2.withColumn("Seconds", when(F.hour("Start")==F.hour("End"), F.col("End").cast('long') - F.col("Start").cast('long'))
               .when(F.hour("Start")==F.col("new1"), 3600-F.minute("Start")*60)
               .when(F.hour("End")==F.col("new1"), F.minute("End")*60)
               .otherwise(3600))

создать временное представление и запросить его

df3.createOrReplaceTempView("final")
display(spark.sql("Select new1, sum(Seconds) from final group by new1 order by new1"))

Приведенный выше ответ Леннарта может быть более убедительным, поскольку он использует объединение, чтобы получить все разные часы, вместо этого я использую UDF, который может быть медленнее. Мой код будет работать для любого пользователя, который может быть в сети в течение любого количества часов. В моих данных использовался только необходимый день, поэтому вы можете использовать приведенный выше фильтр дня, чтобы ограничить свой запрос указанным днем. Окончательный результат

0 голосов
/ 28 января 2020

Попробуйте проверить это:

Инициализировать фильтр.

val filter = to_date("2019-09-21")
val startFilter = to_timestamp("2019-09-21 00:00:00.000")
val endFilter = to_timestamp("2019-09-21 23:59:59.999")

Создать диапазон (0 .. 23).

hours = spark.range(24).collect()

Получить реальные пользовательские сеансы, которые соответствуют фильтр.

df = sessions.alias("s") \
    .where(filter >= to_date(s.start) & filter <= to_date(s.end)) \
    .select(s.user, \
            when(s.start < startFilter, startFilter).otherwise(s.start).alias("start"), \
            when(s.end > endFilter, endFilter).otherwise(s.end).alias("end"))

Объединение совпадающих сеансов пользователей с диапазоном часов.

df2 = df.join(hours, hours.id.between(hour(df.start), hour(df.end)), 'inner') \
    .select(df.user, hours.id.alias("hour"), \
        (when(hour(df.end) > hours.id, 360).otherwise(minute(df.end) * 60 + second(df.end)) - \
         when(hour(df.start) < hours.id, 0).otherwise(minute(df.start) * 60 + second(df.start))).alias("seconds"))

Создание сводки: вычисление количества пользователей и суммы секунд для каждого часа сеансов.

df2.groupBy(df2.hour)\
    .agg(count(df2.user).alias("user counts"), \
         sum(dg2.seconds).alias("seconds")) \
    .show()

Надеюсь, это поможет.

...