Как записать в HDFS, используя API программирования spark, если у меня есть данные для аутентификации? - PullRequest
0 голосов
/ 29 апреля 2018

Мне нужно записать во внешний кластер HDFS, данные аутентификации которого доступны как для простой, так и для аутентификации kerberos. Для простоты предположим, что мы имеем дело с простой аутентификацией.

Вот что у меня есть:

  • Сведения о подключении к внешнему кластеру HDFS (хост, порт)
  • Детали аутентификации (пользователь для простой аутентификации)
  • Местоположение HDFS, куда должны быть записаны файлы (hdfs: // host: port / loc)
  • Кроме того, другие детали, такие как формат и т. Д.

Обратите внимание, что пользователь SPARK отличается от пользователя, указанного для аутентификации HDFS.

Теперь, используя API программирования spark, вот что я пытаюсь сделать:

val hadoopConf =  new Configuration()
hadoopConf.set("fs.defaultFS", fileSystemPath)
hadoopConf.set("hadoop.job.ugi", userName)
val jConf = new JobConf(hadoopConf)
jConf.setUser(user)
jConf.set("user.name", user)
jConf.setOutputKeyClass(classOf[NullWritable])
jConf.setOutputValueClass(classOf[Text])
jConf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])

outputDStream.foreachRDD(r => {
                val rdd = r.mapPartitions { iter =>
                    val text = new Text()
                    iter.map { x =>
                        text.set(x.toString)
                        println(x.toString)
                        (NullWritable.get(), text)
                    }
                }

                val rddCount = rdd.count()
                if(rddCount > 0) {
                    rdd.saveAsHadoopFile(config.outputPath, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], jConf)
                }
            })

Здесь я предполагал, что если мы передадим JobConf с правильными деталями, он должен использоваться для аутентификации, а запись должна выполняться с использованием пользователя, указанного в JobConf.

Тем не менее, запись по-прежнему происходит как пользователь спарка ("root") независимо от деталей аутентификации, представленных в JobConf ("hdfs" как пользователь) . Ниже приводится исключение, которое я получаю:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode="/spark-deploy/out/_temporary/0":hdfs:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665)
at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3900)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:978)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy40.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:558)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy41.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3000)
... 45 more

Пожалуйста, дайте мне знать, если есть какие-либо предложения.

1 Ответ

0 голосов
/ 16 мая 2018

Вероятно, это скорее комментарий, чем ответ, но так как он слишком длинный, я поместил его здесь. Я не пробовал это, потому что у меня нет среды, чтобы проверить это. Пожалуйста, попробуйте и дайте мне знать, если это работает (и если это не так, я удалю этот ответ).

Глядя немного в код, он выглядит так: DFSClient создает прокси , используя createProxyWithClientProtocol , который использует UserGroupInformation.getCurrentUser() (я не отслеживал ветку createHAProxy вниз, но Я подозреваю ту же логику там). Затем эта информация отправляется на сервер для аутентификации.

Это означает, что вам нужно изменить то, что UserGroupInformation.getCurrentUser() возвращает в контексте вашего конкретного вызова. Это то, что UserGroupInformation.doAs должно делать, поэтому вам просто нужно получить правильный экземпляр UserGroupInformation. А в случае простой аутентификации UserGroupInformation.createRemoteUser может действительно работать.

Поэтому я предлагаю попробовать что-то вроде этого:

...
val rddCount = rdd.count()
if(rddCount > 0) {
    val remoteUgi = UserGroupInformation.createRemoteUser("hdfsUserName")
    remoteUgi.doAs(() => { rdd.saveAsHadoopFile(config.outputPath, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], jConf) })
}
...