Реализация автоинкрементного столбца в DataFrame - PullRequest
0 голосов
/ 26 апреля 2019

Я пытаюсь реализовать столбец автоинкремента в DataFrame.Я уже нашел решение, но я хочу знать, есть ли лучший способ сделать это.

Я использую функцию monotonically_increasing_id() из pyspark.sql.functions.Проблема с этим в том, что начать с 0, и я хочу, чтобы он начинался с 1.

Итак, я сделал следующее и работает нормально:

(F.monotonically_increasing_id()+1).alias("songplay_id")

dfLog.join(dfSong, (dfSong.artist_name == dfLog.artist) & (dfSong.title == dfLog.song))\
                    .select((F.monotonically_increasing_id()+1).alias("songplay_id"), \
                               dfLog.ts.alias("start_time"), dfLog.userId.alias("user_id"), \
                               dfLog.level, \
                               dfSong.song_id, \
                               dfSong.artist_id, \
                               dfLog.sessionId.alias("session_id"), \
                               dfLog.location, \
                               dfLog.userAgent.alias("user_agent"))

Есть ли лучший способ реализовать то, что я пытаюсь сделать?Я думаю, это слишком много работы, чтобы реализовать функцию udf только для этого, или только я?

Спасибо .-

1 Ответ

1 голос
/ 26 апреля 2019

Последовательность monotonically_increasing_id не гарантируется быть последовательной, но они гарантированно будут монотонно увеличиваться.Каждой задаче вашей работы будет присвоено начальное целое число, от которого оно будет увеличиваться на 1 в каждой строке, но между последним идентификатором одного пакета и первым идентификатором другого будет разрыв.Чтобы проверить это, вы можете создать задание, содержащее две задачи, перераспределив образец фрейма данных:

import pandas as pd
import pyspark.sql.functions as psf
spark.createDataFrame(pd.DataFrame([[i] for i in range(10)], columns=['value'])) \
    .repartition(2) \
    .withColumn('id', psf.monotonically_increasing_id()) \
    .show()
        +-----+----------+
        |value|        id|
        +-----+----------+
        |    3|         0|
        |    0|         1|
        |    6|         2|
        |    2|         3|
        |    4|         4|
        |    7|8589934592|
        |    5|8589934593|
        |    8|8589934594|
        |    9|8589934595|
        |    1|8589934596|
        +-----+----------+

Чтобы убедиться, что ваш индекс дает последовательные значения, вы можете использовать оконную функцию.

from pyspark.sql import Window
w = Window.orderBy('id')
spark.createDataFrame(pd.DataFrame([[i] for i in range(10)], columns=['value'])) \
    .withColumn('id', psf.monotonically_increasing_id()) \
    .withColumn('id2', psf.row_number().over(w)) \
    .show()
        +-----+---+---+
        |value| id|id2|
        +-----+---+---+
        |    0|  0|  1|
        |    1|  1|  2|
        |    2|  2|  3|
        |    3|  3|  4|
        |    4|  4|  5|
        |    5|  5|  6|
        |    6|  6|  7|
        |    7|  7|  8|
        |    8|  8|  9|
        |    9|  9| 10|
        +-----+---+---+

Примечания:

  • monotonically_increasing_id позволяет вам устанавливать порядок строк, когда они читаются, он начинается с 0 для первой задачии увеличивается, но не обязательно последовательно
  • row_number последовательно индексирует строки в упорядоченном окне и начинается с 1
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...