Правильно соединяйте Apache Spark с фреймами данных, избегая нулевых значений - PullRequest
0 голосов
/ 14 января 2019

Привет всем!

У меня есть два DataFrames в apache spark (2.3), и я хочу присоединиться к ним правильно. Ниже я объясню, что я имею в виду под «правильно». Прежде всего, два кадра данных содержат следующую информацию:

nodeDf: (идентификатор, год, название, авторы, журнал, аннотация)
edgeDf: (srcId, dstId, label)

Метка может быть 0 или 1, если узел 1 связан с узлом 2 или нет.

Я хочу объединить эти два кадра данных, чтобы получить один кадр данных со следующей информацией:

JoinedDF: (id_from, year_from, title_from, journal_from, abstract_from, id_to, year_to, title_to, journal_to, abstract_to, time_dist)

time_dist = abs (year_from - year_to)

Когда я сказал «правильно», я имел в виду, что запрос должен быть настолько быстрым, насколько это возможно, и я не хочу содержать нулевые строки или клетки (значение в строке).

Я попробовал следующее, но мне потребовалось 500 -540 секунд, чтобы выполнить запрос, и последний фрейм данных содержит нулевые значения. Я даже не знаю, правильно ли соединились данные.

Я хочу упомянуть, что файл узла, из которого я создаю nodeDF, имеет 27770 строк, а файл края (edgeDf) имеет 615512 строк.

код:

val spark = SparkSession.builder().master("local[*]").appName("Logistic Regression").getOrCreate()
val sc = spark.sparkContext

val data = sc.textFile("resources/data/training_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1), fields(2).toInt)
})

val data2 = sc.textFile("resources/data/test_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1))
})

import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val testDF = data2.toDF("srcId","dstId")

val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")

val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")

println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())

println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())

println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")

println(joinedDF.count())

joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")

joinedDF.show(5)


val graphX = new GraphX()
val pageRankDf =graphX.computePageRank(spark,"resources/data/training_set.txt",0.0001)

println("Joining joinedDF and pageRankDf...")
joinedDF = joinedDF.as("a").join(pageRankDf.as("b"),$"a.srcId" === $"b.nodeId")

var dfWithRanks = joinedDF.select("srcId","dstId","label","year","title","authors","jurnal","abstract","rank").withColumnRenamed("rank","pgRank")
dfWithRanks.show(5)

println("Renameming joinedDF...")
dfWithRanks = dfWithRanks
  .withColumnRenamed("srcId","id_from")
  .withColumnRenamed("dstId","id_to")
  .withColumnRenamed("year","year_from")
  .withColumnRenamed("title","title_from")
  .withColumnRenamed("authors","authors_from")
  .withColumnRenamed("jurnal","jurnal_from")
  .withColumnRenamed("abstract","abstract_from")

var infoDfRenamed = dfWithRanks
  .withColumnRenamed("id_from","id_from")
  .withColumnRenamed("id_to","id_to")
  .withColumnRenamed("year_from","year_to")
  .withColumnRenamed("title_from","title_to")
  .withColumnRenamed("authors_from","authors_to")
  .withColumnRenamed("jurnal_from","jurnal_to")
  .withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")

var finalDF = dfWithRanks.as("a").join(infoDF.as("b"),$"a.id_to" === $"b.srcId")

finalDF = finalDF
  .withColumnRenamed("year","year_to")
  .withColumnRenamed("title","title_to")
  .withColumnRenamed("authors","authors_to")
  .withColumnRenamed("jurnal","jurnal_to")
  .withColumnRenamed("abstract","abstract_to")

println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")

finalDF.show(5)  

Вот мои результаты!

enter image description here

Избегайте всех расчетов и кода, связанных с pgRank! Есть ли какой-нибудь правильный способ сделать это работает?

Ответы [ 2 ]

0 голосов
/ 14 января 2019

используйте оператор <=> в условии присоединения к столбцу

var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" <=> $"b.srcId") 

В Spark 2.1 или более поздней версии есть функция eqNullSafe

var joinedDF = trainingDF.join(infoDF,trainingDF("srcId").eqNullSafe(infoDF("srcId")))
0 голосов
/ 14 января 2019

Вы можете сначала отфильтровать свои данные, а затем присоединиться, в этом случае вы избежите пустых значений

df.filter ($ "ColumnName" .isNotNull)

...