Я не уверен, что это ошибка Spark по-другому относиться к Pipe (), но я открыл похожую проблему на JIRA: https://issues.apache.org/jira/projects/SPARK/issues/SPARK-26101
Теперь перейдем к проблеме.Очевидно, что в кластере YARN Spark Pipe () запрашивает контейнер, независимо от того, является ли ваш Hadoop незащищенным или защищен Kerberos, это разница между тем, запускает ли контейнер пользователь yarn/nobody
или пользователем, который запускает контейнер your actual user
.
Либо используйте Kerberos для защиты вашего Hadoop, либо, если вы не хотите проходить через защиту Hadoop, вы можете установить две конфигурации в YARN, которые используют пользователей / группы Linux для запуска контейнера. Примечание , вы должны совместно использовать одинаковых пользователей / групп во всех узлах вашего кластера.В противном случае это не сработает.(возможно, используйте LDAP / AD для синхронизации ваших пользователей / групп)
Задайте следующие параметры:
yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users = false
yarn.nodemanager.container-executor.class = org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor
Источник: https://hadoop.apache.org/docs/r2.7.4/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html (то же самое даже в Hadoop 3.0)
Это исправление работало на последнем CDH 5.15.1 Cloudera (yarn-site.xml): http://community.cloudera.com/t5/Batch-Processing-and-Workflow/YARN-force-nobody-user-on-all-jobs-and-so-they-fail/m-p/82572/highlight/true#M3882
Пример:
val test = sc.parallelize(Seq("test user")).repartition(1)
val piped = test.pipe(Seq("whoami"))
val c = piped.collect()
est: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at repartition at <console>:25
piped: org.apache.spark.rdd.RDD[String] = PipedRDD[5] at pipe at <console>:25
c: Array[String] = Array(maziyar)
Это вернет username
который начал сеанс Spark после установки этих конфигов в yarn-site.xml
и синхронизации всех пользователей / групп среди всех узлов.