Я пытаюсь сделать PO C в pyspark по очень простому требованию. В качестве первого шага я просто пытаюсь скопировать записи таблицы из одной таблицы в другую. Существует более 20 таблиц, но сначала я пытаюсь сделать это только для одной таблицы, а затем расширить ее до нескольких таблиц.
Приведенный ниже код работает нормально, когда я пытаюсь скопировать только 10 записей . Но когда я пытаюсь скопировать все записи из основной таблицы, этот код застревает, и в конечном итоге мне приходится прекращать его вручную. Поскольку в основной таблице 1 миллион записей, я ожидал, что это произойдет через несколько секунд, но это просто не было завершено.
Spark UI:
введите описание изображения здесь
Не могли бы вы подсказать, как мне с этим справиться?
Host : Local Machine
Spark verison : 3.0.0
database : Oracle
Код:
from pyspark.sql import SparkSession
from configparser import ConfigParser
#read configuration file
config = ConfigParser()
config.read('config.ini')
#setting up db credentials
url = config['credentials']['dbUrl']
dbUsr = config['credentials']['dbUsr']
dbPwd = config['credentials']['dbPwd']
dbDrvr = config['credentials']['dbDrvr']
dbtable = config['tables']['dbtable']
#print(dbtable)
# database connection
def dbConnection(spark):
pushdown_query = "(SELECT * FROM main_table) main_tbl"
prprDF = spark.read.format("jdbc")\
.option("url",url)\
.option("user",dbUsr)\
.option("dbtable",pushdown_query)\
.option("password",dbPwd)\
.option("driver",dbDrvr)\
.option("numPartitions", 2)\
.load()
prprDF.write.format("jdbc")\
.option("url",url)\
.option("user",dbUsr)\
.option("dbtable","backup_tbl")\
.option("password",dbPwd)\
.option("driver",dbDrvr)\
.mode("overwrite").save()
if __name__ =="__main__":
spark = SparkSession\
.builder\
.appName("DB refresh")\
.getOrCreate()
dbConnection(spark)
spark.stop()