Добавить новую строку в фрейм данных Pyspark на основе значений - PullRequest
2 голосов
/ 15 июня 2019

У меня есть такой фрейм данных:

client_username|workstation|session_duration|access_point_name|start_date|
XX1@AD         |Apple      |1.55            |idf_1            |2019-06-01|
XX2@AD         |Apple      |30.12           |idf_2            |2019-06-04|
XX3@AD         |Apple      |78.25           |idf_3            |2019-06-02|
XX4@AD         |Apple      |0.45            |idf_1            |2019-06-02|
XX1@AD         |Apple      |23.11           |idf_1            |2019-06-02|

client_username - id of user in domain
workstation - user workstation
session_duration - duration (in hours) of the active session (user logged on hist host)
access_point_name - the name of access point that supplies the network to users host
start_date - start session

Я хотел бы получить фрейм данных, как это:

client_username|workstation|session_duration|access_point_name|start_date|
XX1@AD         |Apple      |1.55            |idf_1            |2019-06-01|
XX2@AD         |Apple      |8               |idf_2            |2019-06-04|
XX2@AD         |Apple      |8               |idf_2            |2019-06-05|
XX3@AD         |Apple      |8               |idf_3            |2019-06-02|
XX3@AD         |Apple      |8               |idf_3            |2019-06-03|
XX3@AD         |Apple      |8               |idf_3            |2019-06-04|
XX3@AD         |Apple      |8               |idf_3            |2019-06-05|
XX4@AD         |Apple      |0.45            |idf_1            |2019-06-02|
XX1@AD         |Apple      |23.11           |idf_1            |2019-06-02|

Идея заключается в следующем: * если продолжительность сеанса превышает 24 часа, но меньше 48 часов, я бы хотел изменить его:

XX2@AD         |Apple      |30.12           |idf_2            |2019-06-04|

к нему:

XX2@AD         |Apple      |8               |idf_2            |2019-06-04|
XX2@AD         |Apple      |8               |idf_2            |2019-06-05|

Продолжительность сеанса изменяется до 8 часов, но количество дней увеличивается до двух дней (2019-06-04 и 2019-06-05). Аналитические ситуации для продолжительности свыше 48 часов (3 дня), 72 часа (4 дня) и т. Д.

Я начинаю изучать pyspark. Я пытался использовать union или crossJoin на фрейме данных, но сейчас это очень сложно для меня. Я хотел бы сделать эту задачу с использованием pyspark.

1 Ответ

1 голос
/ 15 июня 2019

Вот несколько методов, которые вы можете попробовать:

Метод-1: строковые функции: повтор , подстрока

  1. рассчитать количество повторений n = ceil(session_duration/24)
  2. создать строку a, которая повторяет подстроку 8, для n раз, а затем использовать substring () или regexp_replace () , чтобы удалить запятую ,
  3. разделите a запятой и затем выделите ее в строки pos и session_duration
  4. отрегулируйте start_date на pos из вышеприведенного шага
  5. приведите строку session_duration в double

см. Пример кода ниже:

from pyspark.sql import functions as F

# assume the columns in your dataframe are read with proper data types
# for example using inferSchema=True
df = spark.read.csv('/path/to/file', header=True, inferSchema=True)

df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
        .withColumn('a', F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration')))

>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
|client_username|workstation|session_duration|access_point_name|         start_date|  n|      a|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+
|         XX1@AD|      Apple|            1.55|            idf_1|2019-06-01 00:00:00|  1|   1.55|
|         XX2@AD|      Apple|           30.12|            idf_2|2019-06-04 00:00:00|  2|    8,8|
|         XX3@AD|      Apple|           78.25|            idf_3|2019-06-02 00:00:00|  4|8,8,8,8|
|         XX4@AD|      Apple|            0.45|            idf_1|2019-06-02 00:00:00|  1|   0.45|
|         XX1@AD|      Apple|           23.11|            idf_1|2019-06-02 00:00:00|  1|  23.11|
+---------------+-----------+----------------+-----------------+-------------------+---+-------+

df_new = df1.select(
          'client_username'
        , 'workstation'
        , F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
        , 'access_point_name'
        , F.expr('date_add(start_date, pos)').alias('start_date')
    ).drop('pos')

>>> df_new.show()
+---------------+-----------+----------------+-----------------+----------+
|client_username|workstation|session_duration|access_point_name|start_date|
+---------------+-----------+----------------+-----------------+----------+
|         XX1@AD|      Apple|            1.55|            idf_1|2019-06-01|
|         XX2@AD|      Apple|               8|            idf_2|2019-06-04|
|         XX2@AD|      Apple|               8|            idf_2|2019-06-05|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-02|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-03|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-04|
|         XX3@AD|      Apple|               8|            idf_3|2019-06-05|
|         XX4@AD|      Apple|            0.45|            idf_1|2019-06-02|
|         XX1@AD|      Apple|           23.11|            idf_1|2019-06-02|
+---------------+-----------+----------------+-----------------+----------+

Вышеприведенный код также можно записать в одну цепочку:

df_new = df.withColumn('n'
                , F.ceil(F.col('session_duration')/24).astype('int')
          ).withColumn('a'
                , F.when(F.col('n')>1, F.expr('substring(repeat("8,",n),0,2*n-1)')).otherwise(F.col('session_duration'))
          ).select('client_username'
                , 'workstation'
                , F.posexplode(F.split('a', ',')).alias('pos', 'session_duration')
                , 'access_point_name'
                , F.expr('date_add(start_date, pos)').alias('start_date')
          ).withColumn('session_duration'
                , F.col('session_duration').astype('double')
          ).drop('pos')

Метод-2: функция массива array_repeat (pyspark 2.4 +)

Аналогично методу 1, но a уже является массивом, поэтому нет необходимости разбивать строку на массив:

df1 = df.withColumn('n', F.ceil(F.col('session_duration')/24).astype('int')) \
        .withColumn('a', F.when(F.col('n')>1, F.expr('array_repeat(8,n)')).otherwise(F.array('session_duration')))

>>> df1.show()
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
|client_username|workstation|session_duration|access_point_name|         start_date|  n|                   a|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+
|         XX1@AD|      Apple|            1.55|            idf_1|2019-06-01 00:00:00|  1|              [1.55]|
|         XX2@AD|      Apple|           30.12|            idf_2|2019-06-04 00:00:00|  2|          [8.0, 8.0]|
|         XX3@AD|      Apple|           78.25|            idf_3|2019-06-02 00:00:00|  4|[8.0, 8.0, 8.0, 8.0]|
|         XX4@AD|      Apple|            0.45|            idf_1|2019-06-02 00:00:00|  1|              [0.45]|
|         XX1@AD|      Apple|           23.11|            idf_1|2019-06-02 00:00:00|  1|             [23.11]|
+---------------+-----------+----------------+-----------------+-------------------+---+--------------------+

df_new = df1.select('client_username'
            , 'workstation'
            , F.posexplode('a').alias('pos', 'session_duration')
            , 'access_point_name'
            , F.expr('date_add(start_date, pos)').alias('start_date')
       ).drop('pos')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...