Как сгенерировать инкрементный sub_id не уникальный с помощью Pyspark - PullRequest
0 голосов
/ 16 января 2020

Моя цель - создать случайный id и инкрементный sub_id . Более подробное объяснение моей проблемы вы найдете ниже.

исходный кадр данных

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

df.show()

+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
|  1|   110|   aaa|walk|work|
|  2|   110|   aaa|walk|work|
|  3|   110|   aaa| bus|work|
|  4|   110|   aaa| bus|work|
|  5|   110|   aaa|walk|work|
|  6|   110|   bbb|walk|home|
|  7|   110|   bbb| bus|home|
|  8|   110|   bbb| bus|home|
|  9|   110|   bbb|walk|home|
+---+------+------+----+----+

Для генерации trip_id (может быть случайным) столбец, который я использовал:

df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest'))).sort('idx')

+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
|  1|   110|   aaa|walk|work|      1|
|  2|   110|   aaa|walk|work|      1|
|  3|   110|   aaa| bus|work|      1|
|  4|   110|   aaa| bus|work|      1|
|  5|   110|   aaa|walk|work|      1|
|  6|   110|   bbb|walk|home|      6|
|  7|   110|   bbb| bus|home|      6|
|  8|   110|   bbb| bus|home|      6|
|  9|   110|   bbb|walk|home|      6|
+---+------+------+----+----+-------+

Для генерации subtrip_id для каждого trip_id , я использовал:

df_subtrip = df_trip.withColumn("subtrip_id", F.row_number().over(Window.partitionBy(['p_uuid', 'u_uuid', 'dest', 'mode']).orderBy('idx')))

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|      1|       122|
|  2|   110|   aaa|walk|work|      1|       122|
|  3|   110|   aaa| bus|work|      1|       123|
|  4|   110|   aaa| bus|work|      1|       123|
|  5|   110|   aaa|walk|work|      1|       124|
|  6|   110|   bbb|walk|home|      6|       997|
|  7|   110|   bbb| bus|home|      6|       998|
|  8|   110|   bbb| bus|home|      6|       998|
|  9|   110|   bbb|walk|home|      6|       999|
+---+------+------+----+----+-------+----------+

Упс !! это не то, что я ищу, проблема в том, что я не могу создать sub_id incremntale like.

Что я ищу:

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|      1|         1|
|  2|   110|   aaa|walk|work|      1|         1|
|  3|   110|   aaa| bus|work|      1|         2|
|  4|   110|   aaa| bus|work|      1|         2|
|  5|   110|   aaa|walk|work|      1|         3|
|  6|   110|   bbb|walk|home|      6|         1|
|  7|   110|   bbb| bus|home|      6|         2|
|  8|   110|   bbb| bus|home|      6|         2|
|  9|   110|   bbb|walk|home|      6|         3|
+---+------+------+----+----+-------+----------+

Ответы [ 2 ]

4 голосов
/ 23 января 2020

В качестве оптимизации ответа cronoik, вы можете использовать тот факт, что у вас уже есть глобальный уникальный столбец 'idx' в вашем наборе данных, и установить ваши 'trip_id' и 'subtrip_id' в качестве idx строки, в которой отключение или подстрока начинается.

Это позволит вам сгенерировать trip_id и subtrip_id за один проход с монотонно увеличивающимися целыми числами. Поскольку вам нужно, чтобы subtrip_id начинался с 1 и увеличивался с шагом 1, у вас будет второй проход окна:

import sys
import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

tripW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')

df_final  = df\
  .withColumn("trip_id", F.first('idx').over(tripW))\
  .withColumn('lagged_mode', F.lag('mode', default='').over(tripW))\
  .withColumn('subtrip_id', (F.col('mode') != F.col('lagged_mode')).cast('int'))\
  .withColumn('subtrip_id',F.sum('subtrip_id').over(tripW))\
  .drop('lagged_mode')\
  .sort('idx')

df_final.show()

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|      1|         1|
|  2|   110|   aaa|walk|work|      1|         1|
|  3|   110|   aaa| bus|work|      1|         2|
|  4|   110|   aaa| bus|work|      1|         2|
|  5|   110|   aaa|walk|work|      1|         3|
|  6|   110|   bbb|walk|home|      6|         1|
|  7|   110|   bbb| bus|home|      6|         2|
|  8|   110|   bbb| bus|home|      6|         2|
|  9|   110|   bbb|walk|home|      6|         3|
+---+------+------+----+----+-------+----------+

Если вы посмотрите на сгенерированный план, Spark может построить его довольно эффективно:

df_final.explain()

TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#12637,u_uuid#12638,p_uuid#2863,mode#2864,dest#2865,trip_id#12642,subtrip_id#12643])
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, subtrip_id#12614L]
   +- Window [sum(cast(subtrip_id#12604 as bigint)) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#12614L], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
      +- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, cast(NOT (mode#2864 = lagged_mode#12596) as int) AS subtrip_id#12604]
         +- Window [first(idx#2861L, false) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS trip_id#12589L, lag(mode#2864, 1, ) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lagged_mode#12596], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
            +- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
                  +- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]

Решение cronoik использует другие критерии сортировки или окна и, таким образом, потребует более дорогих шагов сортировки и более длительного времени выполнения:

df_subtrip.sort('idx').explain()

TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#2948,u_uuid#2949,p_uuid#2863,mode#2864,dest#2865,trip_id#2953,subtrip_id#2954])
+- *(4) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, subtrip_id#2933]
   +- Window [last(subtrip_id#2923, true) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#2933], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
      +- *(3) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
         +- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN (subtrip_id#2913 = 1) THEN _we0#2924 ELSE null END AS subtrip_id#2923]
            +- Window [row_number() windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#2924], [u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913], [idx#2861L ASC NULLS FIRST]
               +- *(2) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, subtrip_id#2913 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
                  +- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN NOT (mode#2864 = _we0#2914) THEN 1 ELSE null END AS subtrip_id#2913]
                     +- Window [lag(mode#2864, 1, SOMETHING) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#2914], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
                        +- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
                           +- *(1) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, SPARK_PARTITION_ID() AS trip_id#2887]
                              +- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
                                 +- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]
2 голосов
/ 21 января 2020

В настоящее время вы не учитываете значение mode предыдущей строки в вашем операторе df_subtrip, и я также думаю, что ваш оператор trip_id может вызвать исключение OOM, поскольку все ваши данные загружаются в один раздел. Пожалуйста, посмотрите на прокомментированный пример ниже:

import sys
import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

df.show()

#your trip_id statement will load all your data to one partition which isn't recommend and can cause OOM
#df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest')))
#the following could(!) increase the performance
df = df.repartition('u_uuid', 'p_uuid', 'dest')
df_trip = df.withColumn("trip_id", F.spark_partition_id())

df_trip.show()

defaultW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')

#mark the first row of each group with 1
df_subtrip = df_trip.withColumn("subtrip_id", F.when(df_trip.mode != F.lag(df_trip.mode, default='SOMETHING').over(defaultW), 1).otherwise(None))

#gives each first row of a group a row_number
df_subtrip = df_subtrip.withColumn("subtrip_id", F.when(df_subtrip.subtrip_id == 1 , F.row_number().over(Window.partitionBy('u_uuid', 'p_uuid', 'dest', 'subtrip_id' ).orderBy('idx'))).otherwise(None))

#forward-fill the empty subtrip_id's
df_subtrip = df_subtrip.withColumn('subtrip_id', F.last('subtrip_id', True).over(defaultW.rowsBetween(-sys.maxsize,0)))

df_subtrip.sort('idx').show()

Вывод:

+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
|  1|   110|   aaa|walk|work|
|  2|   110|   aaa|walk|work|
|  3|   110|   aaa| bus|work|
|  4|   110|   aaa| bus|work|
|  5|   110|   aaa|walk|work|
|  6|   110|   bbb|walk|home|
|  7|   110|   bbb| bus|home|
|  8|   110|   bbb| bus|home|
|  9|   110|   bbb|walk|home|
+---+------+------+----+----+

+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
|  5|   110|   aaa|walk|work|     43|
|  1|   110|   aaa|walk|work|     43|
|  2|   110|   aaa|walk|work|     43|
|  3|   110|   aaa| bus|work|     43|
|  4|   110|   aaa| bus|work|     43|
|  6|   110|   bbb|walk|home|     62|
|  7|   110|   bbb| bus|home|     62|
|  8|   110|   bbb| bus|home|     62|
|  9|   110|   bbb|walk|home|     62|
+---+------+------+----+----+-------+

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|     43|         1|
|  2|   110|   aaa|walk|work|     43|         1|
|  3|   110|   aaa| bus|work|     43|         2|
|  4|   110|   aaa| bus|work|     43|         2|
|  5|   110|   aaa|walk|work|     43|         3|
|  6|   110|   bbb|walk|home|     62|         1|
|  7|   110|   bbb| bus|home|     62|         2|
|  8|   110|   bbb| bus|home|     62|         2|
|  9|   110|   bbb|walk|home|     62|         3|
+---+------+------+----+----+-------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...