Spark Dataframes являются неизменной структурой. Таким образом, вы не можете делать какие-либо обновления на основе идентификатора.
Способ обновления фрейма данных состоит в том, чтобы объединить старый и новый фреймы и сохранить объединенный фрейм данных в HDFS. Для обновления старого идентификатора вам потребуется какой-то ключ дедупликации (возможно, отметка времени).
Я добавляю пример кода для этого в scala. Вам необходимо вызвать функцию merge
с уникальным идентификатором и именем столбца отметки времени. Отметка времени должна быть в Long.
case class DedupableDF(unique_id: String, ts: Long);
def merge(snapshot: DataFrame)(
delta: DataFrame)(uniqueId: String, timeStampStr: String): DataFrame = {
val mergedDf = snapshot.union(delta)
return dedupeData(mergedDf)(uniqueId, timeStampStr)
}
def dedupeData(dataFrameToDedupe: DataFrame)(
uniqueId: String,
timeStampStr: String): DataFrame = {
import sqlContext.implicits._
def removeDuplicates(
duplicatedDataFrame: DataFrame): Dataset[DedupableDF] = {
val dedupableDF = duplicatedDataFrame.map(a =>
DedupableDF(a(0).asInstanceOf[String], a(1).asInstanceOf[Long]))
val mappedPairRdd =
dedupableDF.map(row ⇒ (row.unique_id, (row.unique_id, row.ts))).rdd;
val reduceByKeyRDD = mappedPairRdd
.reduceByKey((row1, row2) ⇒ {
if (row1._2 > row2._2) {
row1
} else {
row2
}
})
.values;
val ds = reduceByKeyRDD.toDF.map(a =>
DedupableDF(a(0).asInstanceOf[String], a(1).asInstanceOf[Long]))
return ds;
}
/** get distinct unique_id, timestamp combinations **/
val filteredData =
dataFrameToDedupe.select(uniqueId, timeStampStr).distinct
val dedupedData = removeDuplicates(filteredData)
dataFrameToDedupe.createOrReplaceTempView("duplicatedDataFrame");
dedupedData.createOrReplaceTempView("dedupedDataFrame");
val dedupedDataFrame =
sqlContext.sql(s""" select distinct duplicatedDataFrame.*
from duplicatedDataFrame
join dedupedDataFrame on
(duplicatedDataFrame.${uniqueId} = dedupedDataFrame.unique_id
and duplicatedDataFrame.${timeStampStr} = dedupedDataFrame.ts)""")
return dedupedDataFrame
}