У меня есть DataFrame, где мне нужно создать столбец на основе значений из каждой строки.Я использую UDF, который обрабатывает каждую строку и подключается к HBase для получения данных.
UDF создает соединение, возвращает данные, закрывает соединение.
Процесс медленный, поскольку Zookeeper зависает после нескольких чтений.Я хочу вытащить данные только с 1 открытым соединением.
Я пробовал mapwithpartition, но соединение не передается, поскольку оно не сериализовано.
UDF: -
val lookUpUDF = udf((partyID: Int, brand: String, algorithm: String, bigPartyProductMappingTableName: String, env: String) => lookUpLogic.lkpBigPartyAccount(partyID, brand, algorithm, bigPartyProductMappingTableName, env))
Как итерирует DataFrame: -
ocisPreferencesDF
.withColumn("deleteStatus", lookUpUDF(col(StagingBatchConstants.OcisPreferencesPartyId),
col(StagingBatchConstants.OcisPreferencesBrand), lit(EnvironmentConstants.digest_algorithm), lit
(bigPartyProductMappingTableName), lit(env)))
Основной логин: -
def lkpBigPartyAccount(partyID: Int,
brand: String,
algorithm: String,
bigPartyProductMappingTableName: String,
envVar: String,
hbaseInteraction: HbaseInteraction = new HbaseInteraction,
digestGenerator: DigestGenerator = new DigestGenerator): Array[(String, String)] = {
AppInit.setEnvVar(envVar)
val message = partyID.toString + "-" + brand
val rowKey = Base64.getEncoder.encodeToString(message.getBytes())
val hbaseAccountInfo = hbaseInteraction.hbaseReader(bigPartyProductMappingTableName, rowKey, "cf").asScala
val convertMap: mutable.HashMap[String, String] = new mutable.HashMap[String, String]
for ((key, value) <- hbaseAccountInfo) {
convertMap.put(key.toString, value.toString)
}
convertMap.toArray
}
Я ожидаю улучшенияпроизводительность кода.Я надеюсь создать соединение только один раз.