Добавление индекса строки в фрейм данных pyspark (чтобы добавить новый столбец / объединить фреймы данных параллельно) - PullRequest
0 голосов
/ 27 марта 2019

Я пытался объединить два кадра данных рядом. И я увидел это . В описании для monotonically_increasing_id () написано:

"monotonically_increasing_id () - возвращает монотонно увеличивающиеся 64-разрядные целые числа. Сгенерированный идентификатор гарантированно будет монотонно увеличиваться и уникален, но не последовательным. Текущая реализация помещает идентификатор раздела в верхние 31 бит, а младшие 33 бита представляют номер записи в каждом разделе. Предполагается, что кадр данных содержит менее 1 миллиарда разделов, а каждый раздел содержит менее 8 миллиардов записей. Функция является недетерминированной, поскольку ее результат зависит от идентификаторов разделов ».

Я пытаюсь понять, как мы предполагаем, что monotonically_increasing_id () дает одинаковые результаты для обоих этих фреймов данных, поскольку они недетерминированы. Если он генерирует разные row_numbers для этих фреймов данных, они не присоединятся. «Результат зависит от идентификаторов разделов», часть может быть ответом, но я не понимаю этого. Может кто-нибудь объяснить?

Ответы [ 2 ]

0 голосов
/ 27 марта 2019

Это лучший способ, который я нашел до сих пор, для добавления индекса к фрейму данных df:

new_columns = df.columns + ["row_idx"]

# Adding row index
df = df\
    .rdd\
    .zipWithIndex()\
    .map(lambda(row, rowindex): row + (rowindex,)).toDF()

# Renaming all the columns
df = df.toDF(*new_columns)

Он имеет издержки на преобразование в rdd и затем обратно вкадр данных.Тем не менее, monotonically_increasing_id() является недетерминированным и row_number() требует Window, который не может быть идеальным, если не используется с PARTITION BY, в противном случае он перетасовывает все данные в один раздел, побеждая цельof pyspark.

Итак, чтобы добавить список в качестве нового столбца в кадре данных, просто преобразуйте список в кадр данных

new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])

и добавьте к нему row_number, как указано выше.Тогда присоединяйтесь,

joined_df = df.join(new_df, ['row_idx'], 'inner')
0 голосов
/ 27 марта 2019

Это из моего опыта. У monotonically_increasing_id () есть некоторая грубость. Для небольших случаев использования вы всегда получите увеличенный идентификатор. Однако, если у вас есть сложные тасования или проблемы с использованием данных, они могут увеличиваться и не будут увеличиваться при одинаковом значении каждый тик. Под этим я подразумеваю, что DF1 перешел от 1-> ~ 100000000, однако во время перестановки DF2 был снова пересчитан из ленивой реализации Spark, он перешел с 1-> ~ 48000000, затем 48000001.23-> 100000000.23. Это означало, что я потерял кучу строк.

Как я решил проблему, было с помощью уникальных Row_ID. Для этого у меня была функция Row_Hash, ниже она должна была пройти и создать уникальный идентификатор строки в начале столбца. Независимо от того, сколько было перемешиваний или операций записи данных, я сохранял уникальность моих условий соединения.

РЕДАКТИРОВАТЬ: я собираюсь превратить все элементы метаданных фрейма данных в массивы. Причина этого в том, что вы можете указать, какие элементы массива вы хотите запросить. Это отличается от фрейма данных, поскольку из-за случайного перемешивания и перераспределения вызов take (n) может дать разные результаты, однако при вызове array (n) результаты всегда будут одинаковыми.

Имея это в виду, давайте вернемся к проблеме, нам нужно создать локальный идентификатор строки там, где его нет. Для этого мы полностью объединяем строки (это для сценариев, в которых нет ключей строк), вызывая MD5 поверх продукта (да, есть вероятность пересечения, но она чрезвычайно мала). Это приведет к большому символу строки для каждой строки, отделяя ее от остальной системы, позволяя пользователю использовать его в качестве уникального ключа соединения строк.

#Call in the input data frame
val inputDF = ...

#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns

#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields. 
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns. 
#The reason we are passing through the names is because we want to return the base columns. 
#Think of a select query
var commandArray = columnArray

#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1

#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length) {

  #We are going to take an N element from the columns and build a statement to cast it as a string 
  commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

  #This loop checks if we are not the last element of the column array, if so we add 
  #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
  if (columnIterator!=columnArray.length) {commandString = commandString + ", "}
  #Iterator
  columnIterator = columnIterator + 1
}

#I am appending the command we just build to the from of the command array with 
#a few extra characters to end the local command and name it something consistent. 
#So if we have a DF of Name, Addr, Date; this below statement will look like 
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF 
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date) 
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date. 
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...