Как выполнить операцию UPSERT в Apache Spark? - PullRequest
0 голосов
/ 11 ноября 2019

Я пытаюсь обновить и вставить записи в старый Dataframe, используя уникальный столбец "ID", используя Apache Spark.

Ответы [ 2 ]

0 голосов
/ 11 ноября 2019

Spark Dataframes являются неизменной структурой. Таким образом, вы не можете делать какие-либо обновления на основе идентификатора.

Способ обновления фрейма данных состоит в том, чтобы объединить старый и новый фреймы и сохранить объединенный фрейм данных в HDFS. Для обновления старого идентификатора вам потребуется какой-то ключ дедупликации (возможно, отметка времени).

Я добавляю пример кода для этого в scala. Вам необходимо вызвать функцию merge с уникальным идентификатором и именем столбца отметки времени. Отметка времени должна быть в Long.

case class DedupableDF(unique_id: String, ts: Long);

def merge(snapshot: DataFrame)(
      delta: DataFrame)(uniqueId: String, timeStampStr: String): DataFrame = {
    val mergedDf = snapshot.union(delta)
    return dedupeData(mergedDf)(uniqueId, timeStampStr)

  }

def dedupeData(dataFrameToDedupe: DataFrame)(
      uniqueId: String,
      timeStampStr: String): DataFrame = {
    import sqlContext.implicits._

    def removeDuplicates(
        duplicatedDataFrame: DataFrame): Dataset[DedupableDF] = {
      val dedupableDF = duplicatedDataFrame.map(a =>
        DedupableDF(a(0).asInstanceOf[String], a(1).asInstanceOf[Long]))
      val mappedPairRdd =
        dedupableDF.map(row ⇒ (row.unique_id, (row.unique_id, row.ts))).rdd;
      val reduceByKeyRDD = mappedPairRdd
        .reduceByKey((row1, row2) ⇒ {
          if (row1._2 > row2._2) {
            row1
          } else {
            row2
          }
        })
        .values;
      val ds = reduceByKeyRDD.toDF.map(a =>
        DedupableDF(a(0).asInstanceOf[String], a(1).asInstanceOf[Long]))
      return ds;
    }

    /** get distinct unique_id, timestamp combinations **/
    val filteredData =
      dataFrameToDedupe.select(uniqueId, timeStampStr).distinct

    val dedupedData = removeDuplicates(filteredData)

    dataFrameToDedupe.createOrReplaceTempView("duplicatedDataFrame");
    dedupedData.createOrReplaceTempView("dedupedDataFrame");

    val dedupedDataFrame =
      sqlContext.sql(s""" select distinct duplicatedDataFrame.*
                  from duplicatedDataFrame
                  join dedupedDataFrame on
                  (duplicatedDataFrame.${uniqueId} = dedupedDataFrame.unique_id
                  and duplicatedDataFrame.${timeStampStr} = dedupedDataFrame.ts)""")
    return dedupedDataFrame
  }

0 голосов
/ 11 ноября 2019

Чтобы обновить Dataframe, вы можете выполнить соединение left_anti для уникальных столбцов, а затем UNION с Dataframe с новыми записями

def refreshUnion(oldDS: Dataset[_], newDS: Dataset[_], usingColumns: Seq[String]): Dataset[_] = {
    val filteredNewDS = selectAndCastColumns(newDS, oldDS)
    oldDS.join(
      filteredNewDS,
      usingColumns,
      "left_anti")
      .select(oldDS.columns.map(columnName => col(columnName)): _*)
      .union(filteredNewDS.toDF)
  }

  def selectAndCastColumns(ds: Dataset[_], refDS: Dataset[_]): Dataset[_] = {
    val columns = ds.columns.toSet
    ds.select(refDS.columns.map(c => {
      if (!columns.contains(c)) {
        lit(null).cast(refDS.schema(c).dataType) as c
      } else {
        ds(c).cast(refDS.schema(c).dataType) as c
      }
    }): _*)
  }

val df = refreshUnion(oldDS, newDS, Seq("ID"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...