Spark 2.3.1 insertinto table (s3) с разделениями выполняет много запросов перед фактической записью - PullRequest
0 голосов
/ 03 января 2019

У меня очень простая работа Spark, которая пишет в S3. В таблице есть 3 разных ключа секционирования и множество значений (некоторые из них увеличиваются с каждым часом).

Я использую следующий код:

dataframe.select(reorderFields:_*).write.mode(SaveMode.Overwrite).insertInto(tableName)

В начале этот код был довольно эффективным. Но после того, как стол стал больше, он становился все медленнее и медленнее.

Когда я открыл журнал отладки, я обнаружил, что многие операции чтения происходят до того, как он начинает вычислять фрейм данных.

ЛОГИ:

2019-01-03 16:50:58 [main] DataNucleus.Datastore:58 [DEBUG]: Closing PreparedStatement "com.jolbox.bonecp.PreparedStatementHandle@5470ec7e"
2019-01-03 16:50:58 [main] DataNucleus.Datastore.Native:58 [DEBUG]: SELECT `A0`.`COLUMN_NAME`,`A0`.`ORDER`,`A0`.`INTEGER_IDX` AS NUCORDER0 FROM `SORT_COLS` `A0` WHERE `A0`.`SD_ID` = <297323> AND `A0`.`INTEGER_IDX` >= 0 ORDER BY NUCORDER0
2019-01-03 16:50:58 [main] DataNucleus.Datastore.Retrieve:58 [DEBUG]: Execution Time = 1 ms
2019-01-03 16:50:58 [main] DataNucleus.Datastore:58 [DEBUG]: Closing PreparedStatement "org.datanucleus.store.rdbms.ParamLoggingPreparedStatement@325b1c61"
2019-01-03 16:50:58 [main] DataNucleus.Persistence:58 [DEBUG]: Object "org.apache.hadoop.hive.metastore.model.MStorageDescriptor@6328ec75" field "parameters" is replaced by a SCO wrapper of type "org.datanucleus.store.types.backed.Map" [cache-values=true, lazy-loading=true, queued-operations=false, allow-nulls=true]
2019-01-03 16:50:58 [main] DataNucleus.Persistence:58 [DEBUG]: Object "org.apache.hadoop.hive.metastore.model.MStorageDescriptor@6328ec75" field "parameters" loading contents to SCO wrapper from the datastore
2019-01-03 16:50:58 [main] DataNucleus.Connection:58 [DEBUG]: Connection found in the pool : org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl@236ec794 [conn=com.jolbox.bonecp.ConnectionHandle@712106b5, commitOnRelease=false, closeOnRelease=false, closeOnTxnEnd=true] for key=org.datanucleus.ExecutionContextThreadedImpl@132e3594 in factory=ConnectionFactory:tx[org.datanucleus.store.rdbms.ConnectionFactoryImpl@72c9ebfa]
2019-01-03 16:50:58 [main] DataNucleus.Datastore:58 [DEBUG]: Closing PreparedStatement "com.jolbox.bonecp.PreparedStatementHandle@250ebae4"
2019-01-03 16:50:58 [main] DataNucleus.Datastore.Native:58 [DEBUG]: SELECT `A0`.`PARAM_KEY`,`A0`.`PARAM_VALUE` FROM `SD_PARAMS` `A0` WHERE `A0`.`SD_ID` = <297323> AND `A0`.`PARAM_KEY` IS NOT NULL
2019-01-03 16:50:58 [main] DataNucleus.Datastore.Retrieve:58 [DEBUG]: Execution Time = 1 ms
2019-01-03 16:50:58 [main] DataNucleus.Datastore:58 [DEBUG]: Closing PreparedStatement "org.datanucleus.store.rdbms.ParamLoggingPreparedStatement@798a320"
2019-01-03 16:50:58 [main] DataNucleus.Persistence:58 [DEBUG]: Object "org.apache.hadoop.hive.metastore.model.MStorageDescriptor@6328ec75" field "skewedColNames" is replaced by a SCO wrapper of type "org.datanucleus.store.types.backed.List" [cache-values=true, lazy-loading=true, queued-operations=false, allow-nulls=true]
2019-01-03 16:50:58 [main] DataNucleus.Persistence:58 [DEBUG]: Object "org.apache.hadoop.hive.metastore.model.MStorageDescriptor@6328ec75" field "skewedColNames" loading contents to SCO wrapper from the datastore
2019-01-03 16:50:58 [main] DataNucleus.Connection:58 [DEBUG]: Connection found in the pool : org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl@236ec794 [conn=com.jolbox.bonecp.ConnectionHandle@712106b5, commitOnRelease=false, closeOnRelease=false, closeOnTxnEnd=true] for key=org.datanucleus.ExecutionContextThreadedImpl@132e3594 in factory=ConnectionFactory:tx[org.datanucleus.store.rdbms.ConnectionFactoryImpl@72c9ebfa]
2019-01-03 16:50:58 [main] DataNucleus.Datastore:58 [DEBUG]: Closing PreparedStatement "com.jolbox.bonecp.PreparedStatementHandle@540637b0"

Я попытался перенастроить свой улей со следующими параметрами:

sparkConf.set("hive.auto.convert.join.noconditionaltask.size","200M")
sparkConf.set("hive.auto.convert.join.noconditionaltask","true")
sparkConf.set("hive.optimize.sort.dynamic.partition","false")
sparkConf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
sparkConf.set("parquet.enable.summary-metadata","false")

Также добавлено в hive.xml

  <property>
    <name>hive.stats.autogather</name>
    <value>false</value>
  </property>

Но все равно действует так же.

Я не работаю с HDFS.

Буду признателен за любое предложение ??

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...