Запись фрейма данных Pyspark в sql производительность сервера - PullRequest
2 голосов
/ 19 сентября 2019

Я пытаюсь экспортировать таблицу кустов на сервер sql, используя pyspark.Пожалуйста, смотрите ниже код.

from pyspark import SparkContext
from pyspark import HiveContext
SparkContext.setSystemProperty('spark.jars', '/usr/hdp/current/spark2-client/jars/sqljdbc42.jar')
sc = SparkContext()
hc = HiveContext(sc)
df = hive.sql("SELECT * FROM db.table where currentrecordindicator='Y' limit 1000000")
df.write.mode("overwrite").format("jdbc")
.option("url","jdbc:sqlserver://ser:1433;databasename=db1")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("dbtable", "stg.table")
.option("batchsize","20000")
.option("user","user")
.option("password","pws")
.option("numpartitions","5").save()

Используется ниже команды spark-submit для запуска.

 spark-submit --master yarn 
 --num-executors 5 
 --executor-cores 15 
 --executor-memory 10g  
 --conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null 
 --conf spark.rpc.message.maxSize=200  
 spark_table.py

Ниже приведен журнал вышеуказанного задания.

INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 11) in 56637 ms on p05.pd.com (executor 1) (19/20)
INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 3) in 56651 ms on p05.pd.com (executor 1) (20/20)
INFO DAGScheduler: ShuffleMapStage 0 (save at NativeMethodAccessorImpl.java:0) finished in 56.681 s
INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
INFO DAGScheduler: looking for newly runnable stages
INFO DAGScheduler: running: Set()
INFO DAGScheduler: waiting: Set(ResultStage 1)
INFO DAGScheduler: failed: Set()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[9] at save at NativeMethodAccessorImpl.java:0), which has no missing parents
INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 43.2 KB, free 365.3 MB)
INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 15.4 KB, free 365.3 MB)
INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on X.X.X.X.50:57398 (size: 15.4 KB, free: 366.2 MB)
INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:996
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[9] at save at NativeMethodAccessorImpl.java:0)
INFO YarnScheduler: Adding task set 1.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 20, p03.pd.com, executor 4, partition 0, NODE_LOCAL, 5963 bytes)
INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on p03.pd.com:45793 (size: 15.4 KB, free: 5.2 GB)
INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to X.X.X.X:60768
**INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 253 bytes**
INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 20) in 431138 ms on p03.pd.com (executor 4) (1/1)
INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
INFO DAGScheduler: ResultStage 1 (save at NativeMethodAccessorImpl.java:0) finished in 431.140 s
INFO DAGScheduler: Job 0 finished: save at NativeMethodAccessorImpl.java:0, took 488.105556 s
INFO SparkContext: Invoking stop() from shutdown hook

После этой информации в журнале " INFO MapOutputTrackerMaster: размер выходных статусов для случайного числа 0 составляет 253 байта"моя работа зависает на 8 минут и завершается.

Я думаю, что некоторые из них, где моя конфигурация пошла не так, и экспорт 1М записей занимает много времени.

Может ли кто-нибудь помочь мнеулучшить показатели моей работы.У меня есть 31M записей с 9GB данными в таблице. Я показал пример с 1M записями.

Примечание: В текущей среде prod мы используем pyodbc для экспорта, что очень медленно при чтении таблицы кустов из-за того, что мы делаемНекоторое POC вызывает искру в улучшении производительности.

Пожалуйста, посмотрите на Проблема производительности при чтении данных из улья с использованием python

Спасибо.

...