Это из моего опыта. У monotonically_increasing_id () есть некоторая грубость. Для небольших случаев использования вы всегда получите увеличенный идентификатор. Однако, если у вас есть сложные тасования или проблемы с использованием данных, они могут увеличиваться и не будут увеличиваться при одинаковом значении каждый тик. Под этим я подразумеваю, что DF1 перешел от 1-> ~ 100000000, однако во время перестановки DF2 был снова пересчитан из ленивой реализации Spark, он перешел с 1-> ~ 48000000, затем 48000001.23-> 100000000.23. Это означало, что я потерял кучу строк.
Как я решил проблему, было с помощью уникальных Row_ID. Для этого у меня была функция Row_Hash, ниже она должна была пройти и создать уникальный идентификатор строки в начале столбца. Независимо от того, сколько было перемешиваний или операций записи данных, я сохранял уникальность моих условий соединения.
РЕДАКТИРОВАТЬ: я собираюсь превратить все элементы метаданных фрейма данных в массивы. Причина этого в том, что вы можете указать, какие элементы массива вы хотите запросить. Это отличается от фрейма данных, поскольку из-за случайного перемешивания и перераспределения вызов take (n) может дать разные результаты, однако при вызове array (n) результаты всегда будут одинаковыми.
Имея это в виду, давайте вернемся к проблеме, нам нужно создать локальный идентификатор строки там, где его нет. Для этого мы полностью объединяем строки (это для сценариев, в которых нет ключей строк), вызывая MD5 поверх продукта (да, есть вероятность пересечения, но она чрезвычайно мала). Это приведет к большому символу строки для каждой строки, отделяя ее от остальной системы, позволяя пользователю использовать его в качестве уникального ключа соединения строк.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length) {
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) {commandString = commandString + ", "}
#Iterator
columnIterator = columnIterator + 1
}
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)