Как получить значения х в виде пары на фрейме данных в Scala? - PullRequest
0 голосов
/ 18 января 2019

У меня есть фрейм данных с 27770 записями в scala с использованием spark. Этот информационный кадр содержит только один столбец целых чисел. Я хочу объединить этот столбец по его полке для создания нового кадра данных с парами на 2 значения. Я хочу сделать это для каждой строки в dataframe. Я пытаюсь сделать это с помощью кода ниже:

for (elem1 <- nodeDf.collect()) {
  for (elem2 <- nodeDf.collect()) {
      if(elem1 != elem2 && elem2 > elem1) {
        //get pair elem1, elem2
      }
  }
}  

Intellij показывает мне ошибку об операторе «>», который говорит, что «не может разрешить символ».
Что я делаю не так? Как я могу получить новый фрейм данных с двумя столбцами для каждой комбинации всех значений?

Например: входной фрейм данных содержит

1
2
3

Я хочу получить новый кадр данных с парами, как показано ниже:

1,2
1,3
2,3

Я хочу пропустить пары типа 1,1, 2,2 или 2,1, потому что у меня уже есть 1,2, что для меня одинаково.

Спасибо.

Ответы [ 3 ]

0 голосов
/ 18 января 2019

Вам нужно пересечь, войдите в тот же DS. После вы можете написать предложение where, которое возвращает только строку с другим номером между двумя столбцами и только строку, в которой ANumber меньше BNumber. Это пример:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
import org.scalatest.FunSuite

class Test extends FunSuite { 

  test("Test spark cross join") {
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._

    val rows = Seq(Row(1),Row(2),Row(3))
    val schema = StructType(Seq(StructField("Number",DataTypes.IntegerType)))
    val ds = spark.createDataset(rows)(RowEncoder(schema))

    val crossJoinDs = ds.select($"Number".as("ANumber"))
      .crossJoin(ds.select($"Number".as("BNumber")))
      .where($"ANumber" =!=  $"BNumber" && $"ANumber" < $"BNumber")
      .map(r => String.valueOf(r(0))+","+String.valueOf(r(1)))(Encoders.STRING)

    crossJoinDs.show()

  }

, которые печатают следующий вывод:

+-----+
|value|
+-----+
|  1,2|
|  1,3|
|  2,3|
+-----+

Когда вы пишете сбор и перебираете результат, вы отправляете все данные в drivernode. В основном вы останавливаете распределенное вычисление произведений.

0 голосов
/ 18 января 2019

Хорошо! Наконец, я нашел это. Я должен сделать только SQL-запрос, как показано ниже:

result.createOrReplaceTempView("pairs")
var pairsDF = result.sqlContext.sql("select * from pairs a, pairs b where a.id < b.id").toDF("id_from","id_to")  

Результат:

enter image description here

Результат проверен для каждой строки и признан работающим правильно! Спасибо, ребята.

0 голосов
/ 18 января 2019

Вы можете просто сделать декартово произведение из DataFrame с собой.

val result =
  df.as("a").crossJoin(
    df.as("b")
  ).filter(
    ($"a.id" =!= $"b.id") && ($"b.id" > $"a.id")
  )

Причина, по которой ваш код не работал, заключается в том, что выполнение collect на DataFrame даст вам Array[Row], а в Row.
такого метода нет >. Вы можете исправить это, преобразовав DataFrame в Dataset[Int] с помощью .as[Int] или выбрав элемент строки с помощью elem1.getAsInt(0), но ...

НЕ ДЕЛАЙТЕ ЭТОГО! .
Collect возвращает все ваши распределенные данные драйверу, они не только опасны, но и уничтожают все цели самой Spark.
Кроме того, не говоря уже о том, что двойные collect тех же данных бесполезны и дороги .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...