Spark-Scala: добавочная загрузка данных в Spark Scala вместе с генерацией уникального идентификатора - PullRequest
0 голосов
/ 15 октября 2018

Я использую zipWithIndex, чтобы сгенерировать sequence_number и добавить его в виде отдельного столбца.Я использую код, подобный приведенному ниже:

val file = sparkSession.createDataFrame(lexusmasterrdd,structSchema)
val filerdd=file.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq))
val newSchema=StructType(Array(StructField("Sequence_number",LongType,true)).++(file.schema.fields))
val finalDF=sparkSession.createDataFrame(filerdd,newSchema)

Я сейчас пытаюсь придумать логику для инкрементальной загрузки для того же.Простая загрузка, где новые данные добавляются к существующим данным, а порядковые номера генерируются из последнего сгенерированного номера.

Один из способов добиться этого - получить max(Sequence_number), а затем добавить вместе с функцией row_number() для новогоdata.

Но есть ли другой способ, которым я могу использовать zipWithIndex в дополнительной нагрузке?Некоторый код будет полезен.

Я использую Spark 2.3 с Scala

1 Ответ

0 голосов
/ 15 октября 2018

Один из способов добиться этого - получить max (Sequence_number) и затем добавить вместе с функцией row_number () для новых данных.

Это будет работать, но не масштабируется, потому чтоrow_number() потребуется перетасовать все записи в 1 раздел.Я бы предпочел использовать monotonically_increasing_id():

//get max from "old" data
val prevMaxId = oldDf.select(max($"Sequence_number")).as[Long].head()
val addUniqueID : Column = monotonically_increasing_id() + prevMaxId

val finalDF = newDF.withColumn("Sequence_number",addUniqueID)

, если вы хотите использовать zipWithIndex, вы можете сделать что-то подобное:

//get max from "old" data
val prevMaxId = oldDf.select(max($"Sequence_number")).as[Long].head()    
val finalRDD = oldRdd.zipWithIndex().map{case (data,id) => (data, id+prevMaxId)}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...