Как рассчитать числовую разницу между столбцами разных фреймов данных? - PullRequest
1 голос
/ 23 октября 2019

Учитывая два искровых фрейма данных A и B с одинаковым количеством столбцов и строк, я хочу вычислить числовую разницу между двумя фреймами данных и сохранить ее в другом фрейме данных (или, возможно, в другой структуре данных).

Например, у нас есть следующие наборы данных

DataFrame A:

+----+---+
|  A | B |
+----+---+
|   1|  0|
|   1|  0|
+----+---+

DataFrame B:

----+---+
|  A | B |
+----+---+
|   1| 0 |
|   0| 0 |
+----+---+

Как получить BA, т.е.


+----+---+
| c1 | c2|
+----+---+
|   0| 0 |
|  -1| 0 |
+----+---+

На практике реальные кадры данных имеют последовательное количество строк и более 50 столбцов, для которых необходимо вычислить разницу. Что такое Spark / Scala?

Ответы [ 2 ]

1 голос
/ 24 октября 2019

Я смог решить эту проблему, используя подход ниже. Этот код может работать с любым количеством столбцов. Вы просто должны соответствующим образом изменить входные DF.

import org.apache.spark.sql.Row

val df0 = Seq((1, 5), (1, 4)).toDF("a", "b")
val df1 = Seq((1, 0), (3, 2)).toDF("a", "b")

val columns = df0.columns
    val rdd = df0.rdd.zip(df1.rdd).map {
      x =>
        val arr = columns.map(column =>
          x._2.getAs[Int](column) - x._1.getAs[Int](column))
        Row(arr: _*)
    }

spark.createDataFrame(rdd, df0.schema).show(false)

Сгенерированный вывод:

df0=>
+---+---+
|a  |b  |
+---+---+
|1  |5  |
|1  |4  |
+---+---+
df1=>
+---+---+
|a  |b  |
+---+---+
|1  |0  |
|3  |2  |
+---+---+
Output=>
+---+---+
|a  |b  |
+---+---+
|0  |-5 |
|2  |-2 |
+---+---+
0 голосов
/ 23 октября 2019

Если ваш df A такой же, как df B, вы можете попробовать следующий подход. Я не знаю, будет ли это работать правильно для больших наборов данных, будет лучше иметь идентификатор для объединения, а не создавать его с помощью monotonically_increasing_id().

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val df0 = Seq((1, 0), (1, 0)).toDF("a", "b")
  val df1 = Seq((1, 0), (0, 0)).toDF("a", "b")

  // new cols names
  val colNamesA = df0.columns.map("A_" + _)
  val colNamesB = df0.columns.map("B_" + _)

  // rename cols and add id
  val dfA = df0.toDF(colNamesA: _*)
    .withColumn("id", monotonically_increasing_id())
  val dfB = df1.toDF(colNamesB: _*)
    .withColumn("id", monotonically_increasing_id())

  dfA.show()
  dfB.show()

  // get columns without id
  val dfACols = dfA.columns.dropRight(1).map(dfA(_))
  val dfBCols = dfB.columns.dropRight(1).map(dfB(_))

  // diff between cols
  val calcCols = (dfACols zip dfBCols).map(s=>s._2-s._1)

  // join dfs
  val joined = dfA.join(dfB, "id")
  joined.show()

  calcCols.foreach(_.explain(true))

  joined.select(calcCols:_*).show()
    +---+---+---+
    |A_a|A_b| id|
    +---+---+---+
    |  1|  0|  0|
    |  1|  0|  1|
    +---+---+---+

    +---+---+---+
    |B_a|B_b| id|
    +---+---+---+
    |  1|  0|  0|
    |  0|  0|  1|
    +---+---+---+

    +---+---+---+---+---+
    | id|A_a|A_b|B_a|B_b|
    +---+---+---+---+---+
    |  0|  1|  0|  1|  0|
    |  1|  1|  0|  0|  0|
    +---+---+---+---+---+

    (B_a#26 - A_a#18)
    (B_b#27 - A_b#19)

    +-----------+-----------+
    |(B_a - A_a)|(B_b - A_b)|
    +-----------+-----------+
    |          0|          0|
    |         -1|          0|
    +-----------+-----------+


...