Я использую zipWithIndex
, чтобы сгенерировать sequence_number
и добавить его в виде отдельного столбца.Я использую код, подобный приведенному ниже:
val file = sparkSession.createDataFrame(lexusmasterrdd,structSchema)
val filerdd=file.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq))
val newSchema=StructType(Array(StructField("Sequence_number",LongType,true)).++(file.schema.fields))
val finalDF=sparkSession.createDataFrame(filerdd,newSchema)
Я сейчас пытаюсь придумать логику для инкрементальной загрузки для того же.Простая загрузка, где новые данные добавляются к существующим данным, а порядковые номера генерируются из последнего сгенерированного номера.
Один из способов добиться этого - получить max(Sequence_number)
, а затем добавить вместе с функцией row_number()
для новогоdata.
Но есть ли другой способ, которым я могу использовать zipWithIndex
в дополнительной нагрузке?Некоторый код будет полезен.
Я использую Spark 2.3 с Scala