Я разработал искровое приложение, которое выполняет следующие шаги:
- Считывает данные в определенном формате из местоположения ADL, например:
adl://sampleaccount1.azuredatalakestore.net/sample_data/sample_1/
и создайте DataFrame.
- Выполнить определенные преобразования на входном DataFrame.
- Записывает данные в другое местоположение учетной записи adl, например:
adl://sample2.azuredatalakestore.net/processed_data/
Входные и выходные учетные записи ADL различны и, следовательно, имеют разные
учетные данные, связанные с ними.
При чтении данных я настраивал конфигурации adl следующим образом:
val options = Map(
"dfs.adls.oauth2.access.token.provider.type" -> "ClientCredential",
"dfs.adls.oauth2.refresh.url" -> srcRefreshUrl,
"dfs.adls.oauth2.client.id" -> srcClientId,
"dfs.adls.oauth2.credential" -> srcClientKey
)
options.foreach {
case (key, value) => sparkSession.conf.set(key, value)
}
При сохранении окончательного Dataframe, т.е. outputDf, я устанавливаю учетные данные как:
val options = Map(
"dfs.adls.oauth2.access.token.provider.type" -> "ClientCredential",
"dfs.adls.oauth2.refresh.url" -> dstRefreshUrl,
"dfs.adls.oauth2.client.id" -> dstClientId,
"dfs.adls.oauth2.credential" -> dstClientKey
)
options.foreach {
case (key, value) => outputDf.sparkSession.sparkContext.hadoopConfiguration.set(key, value)
}
Поскольку учетные данные поддерживаются на уровне SparkContext, он не может прочитать данные из исходного местоположения при обработке входных данных.
Кто-нибудь пробовал этот сценарий?