Apache Spark: ошибка тайм-аута после 7 итераций - PullRequest
0 голосов
/ 22 июня 2019

Я должен выполнить проверку поиска моего исходного DF.Есть около 10 проверок поиска, которые мне нужно выполнить на 10 столбцах, присутствующих в исходном DF.Итак, я перебираю цикл for, чтобы проверить, присутствуют ли данные в исходных столбцах DF для каждой проверки поиска.

Программа работает нормально 7 раз (для 7 поисков).Начиная с 8-го поиска, программа останавливается на более длительное время, и после этого я получаю сброс соединения из-за ошибки пира

Ниже приведена логика:

  1. source DF -> Я разделяю данные на основе нулевых и ненулевых значений и создаю 2 DF
  2. Выполняется поискдля ненулевых значений DF путем объединения между исходным DF и поиском DF
  3. Для нулевых DF я заполняю значение как "99999"
  4. Как только вышеуказанные шаги выполнены, ясоздание объединения ненулевых и нулевых DF
  5. Я повторяю вышеупомянутые шаги для всех поисков.

Я пытался кэшировать unionDF (шаг # 5), но все еще япроблема производительности после 7 поисков

    for (lookupTableValues <- larry){
    print("\n")
    print(lookupTableValues)
    print("\n")
    lookupCheckCount = lookupCheckCount + 1
    val lookupTable = lookupTableValues.split(";")(0)
    lookUpCheckMap(lookupTable) = lookupCheckCount
if (!lookupTableValues.split(";")(1).equalsIgnoreCase("sqoop")) {
lookupKeys = lookupTableValues.split(";").drop(1)
parquetDf1 = sqlContext.read.parquet(basePath + "/" + lookupTable)
print("\n")
print(parquetDf1.count())
print("\n")
print(parquetDf1.rdd.getNumPartitions)
print("\n")
}else{
lookupKeys = lookupTableValues.split(";").drop(2)
val hc = new HiveContext(sc)
hc.sql("use " + sqoopedDatabase)
parquetDf1 = hc.sql("select * from " + lookupTable)
}
var joinKey: Column = null
var nullColumnFilter:Column = null
var notNullColumnFilter:Column = null
var lookUpKeyForPickup = ""
var lookUpKeyForPickupAlias = ""
if (lookupKeys.length == 1) {
for (lookupKey <- lookupKeys) {
print(lookupKey)
val firstlookupKey = lookupKey.split("\\=")(0)
val secondlookupKey = lookupKey.split("\\=")(1)
joinKey = upper(dfjoin(firstlookupKey)) === upper(parquetDf1(secondlookupKey))
lookUpKeyForPickup = secondlookupKey
lookUpKeyForPickupAlias = lookupTable + ":" + firstlookupKey
if(lookupTableCounter == 0)
{
filterNullDF = dfjoin.filter(dfjoin(firstlookupKey).isNull || dfjoin(firstlookupKey).===(""))
filterDF = dfjoin.except(filterNullDF)
}
else
{
filterNullDF = unionDF.filter(dfjoin(firstlookupKey).isNull || dfjoin(firstlookupKey).===("") )
filterDF = unionDF.except(filterNullDF)
}
filterNullDF = filterNullDF.withColumn(lookUpKeyForPickupAlias,lit(9999))
joinedDF = filterDF.as("d1").join(parquetDf1.as("d2"), joinKey, "left_outer").select($"d1.*", parquetDf1(lookUpKeyForPickup) as lookUpKeyForPickupAlias)
joinedDF.repartition(4)
unionDF = joinedDF.unionAll(filterNullDF)
print("\n")
print(unionDF.count())
print("\n")
unionDF.show()
}
}
else
{
for (lookupKey <- lookupKeys) {
val firstlookupKey = lookupKey.split("\\=")(0)
val secondlookupKey = lookupKey.split("\\=")(1)
if (joinKey == null) {
joinKey = upper(dfjoin(firstlookupKey)) === upper(parquetDf1(secondlookupKey))
lookUpKeyForPickup = secondlookupKey
lookUpKeyForPickupAlias = lookupTable + ":" + firstlookupKey
if (nullColumnFilter == null){
nullColumnFilter = (dfjoin(firstlookupKey).isNull || dfjoin(firstlookupKey).===(""))
}
} else {
joinKey = joinKey && upper(dfjoin(firstlookupKey)) === upper(parquetDf1(secondlookupKey))
nullColumnFilter = nullColumnFilter && (dfjoin(firstlookupKey).isNull || dfjoin(firstlookupKey).===(""))
}
}
if(lookupTableCounter == 0){
filterNullDF = dfjoin.filter(nullColumnFilter)
filterDF = dfjoin.except(filterNullDF)
}else{
filterNullDF = unionDF.filter(nullColumnFilter)
filterDF = unionDF.except(filterNullDF)
}
filterNullDF = filterNullDF.withColumn(lookUpKeyForPickupAlias,lit(9999))
joinedDF = filterDF.as("d1").join(parquetDf1.as("d2"), joinKey, "left_outer").select($"d1.*", parquetDf1(lookUpKeyForPickup) as lookUpKeyForPickupAlias)
joinedDF.repartition(4)
unionDf.unpersist()
unionDF = joinedDF.unionAll(filterNullDF)
unionDF.cache()
unionDF.show()
print("\n")
print(unionDF.rdd.getNumPartitions)
print("\n")
}
lookupTableCounter += 1
print("\n")
print(lookupTableCounter)
print("\n")
}
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...