Spark в среде Kerberized Had oop и включенной высокой доступности: Spark SQL может читать данные только после задачи записи - PullRequest
0 голосов
/ 22 апреля 2020

Мы долгое время использовали керберизованную среду Had oop (HDP 3.1.4 с Spark 2.3.2 и Ambari 2.7.4), пока все шло хорошо.

Теперь мы включили NameNode высокой доступности и имеют следующую проблему: Когда мы хотим прочитать данные с помощью Spark SQL, нам сначала нужно записать некоторые (другие) данные. Если мы не напишем что-либо до операции чтения, она завершится неудачей.

Вот наш сценарий:

$ kinit -kt /etc/security/keytabs/user.keytab user
$ spark-shell
  1. Выполнить запрос на чтение -> Этот первый запрос на чтение за сеанс завершается неудачно !
scala> spark.sql("SELECT * FROM pm.simulation_uci_hydraulic_sensor").show
Hive Session ID = cbb6b6e2-a048-41e0-8e77-c2b2a7f52dbe
[Stage 0:>                                                          (0 + 1) / 1]20/04/22 15:04:53 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, had-data6.my-company.de, executor 2): java.io.IOException: DestHost:destPort had-job.my-company.de:8020 , LocalHost:localPort had-data6.my-company.de/192.168.178.123:0. Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:831)
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:806)
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1502)
        at org.apache.hadoop.ipc.Client.call(Client.java:1444)
        at org.apache.hadoop.ipc.Client.call(Client.java:1354)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:317)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:862)
        at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:851)
        at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:840)
        at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1004)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:320)
        at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:316)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:328)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:899)
        at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:522)
        at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364)
        at org.apache.orc.OrcFile.createReader(OrcFile.java:251)
        [...]
Запустить задание на запись -> Это работает!
scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> primitiveDS.write.saveAsTable("pm.todelete3")
20/04/22 15:05:07 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Теперь, сделайте то же самое снова -> Это работает (для того же сеанса)!?
scala> spark.sql("SELECT * FROM pm.simulation_uci_hydraulic_sensor").show
+--------+--------+--------------------+------+
|instance|sensorId|                  ts| value|
+--------+--------+--------------------+------+
|      21|     PS6|2020-04-18 17:19:...| 8.799|
|      21|    EPS1|2020-04-18 17:19:...|2515.6|
|      21|     PS3|2020-04-18 17:19:...| 2.187|
+--------+--------+--------------------+------+

При запуске нового сеанса spark-shell, то же поведение!

Может кто-нибудь помочь с этим вопросом? Спасибо!

1 Ответ

1 голос
/ 22 апреля 2020

Мы нашли ответ на проблему: свойства таблицы указывали на «старое» расположение NameNode в таблице, которая была создана до активации High Availability в кластере Had oop.

Вы можете найти таблицу информацию, выполнив следующую команду:

$ spark-shell
scala> spark.sql("DESCRIBE EXTENDED db.table").show(false)

Это показывает информацию таблицы, как в моем случае:

+----------------------------+---------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                    |comment|
+----------------------------+---------------------------------------------------------------------------------------------+-------+
|instance                    |int                                                                                          |null   |
|sensorId                    |string                                                                                       |null   |
|ts                          |timestamp                                                                                    |null   |
|value                       |double                                                                                       |null   |
|                            |                                                                                             |       |
|# Detailed Table Information|                                                                                             |       |
|Database                    |simulation                                                                                   |       |
|Table                       |uci_hydraulic_sensor_1                                                                       |       |                                                                                                                          |       |
|Created By                  |Spark 2.3.2.3.1.4.0-315                                                                      |       |
|Type                        |EXTERNAL                                                                                     |       |
|Provider                    |parquet                                                                                      |       |
|Statistics                  |244762020 bytes                                                                              |       |
|Location                    |hdfs://had-job.mycompany.de:8020/projects/pm/simulation/uci_hydraulic_sensor_1       <== This is important!
|Serde Library               |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe                                  |       |
|InputFormat                 |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat                                |       |
|OutputFormat                |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat                               |       |
+----------------------------+---------------------------------------------------------------------------------------------+-------+

Чтобы задать новое расположение таблицы с именем службы кластера HA, выполните следующую команду SQL:

$ spark-shell
scala> spark.sql("ALTER TABLE simulation.uci_hydraulic_sensor_1 SET LOCATION 'hdfs://my-ha-name/projects/pm/simulation/uci_hydraulic_sensor_1'")

В последующих сеансах Spark чтение таблицы работает отлично!

...