Скопируйте записи из одной таблицы в другую, используя spark- sql -jdbc - PullRequest
0 голосов
/ 03 августа 2020

Я пытаюсь сделать PO C в pyspark по очень простому требованию. В качестве первого шага я просто пытаюсь скопировать записи таблицы из одной таблицы в другую. Существует более 20 таблиц, но сначала я пытаюсь сделать это только для одной таблицы, а затем расширить ее до нескольких таблиц.

Приведенный ниже код работает нормально, когда я пытаюсь скопировать только 10 записей . Но когда я пытаюсь скопировать все записи из основной таблицы, этот код застревает, и в конечном итоге мне приходится прекращать его вручную. Поскольку в основной таблице 1 миллион записей, я ожидал, что это произойдет через несколько секунд, но это просто не было завершено.

Spark UI:

введите описание изображения здесь

Не могли бы вы подсказать, как мне с этим справиться?

Host : Local Machine
Spark verison : 3.0.0
database : Oracle

Код:

from pyspark.sql import SparkSession
from configparser import ConfigParser

#read configuration file
config  = ConfigParser()
config.read('config.ini')

#setting up db credentials
url     = config['credentials']['dbUrl']
dbUsr   = config['credentials']['dbUsr']
dbPwd   = config['credentials']['dbPwd']
dbDrvr  = config['credentials']['dbDrvr']
dbtable = config['tables']['dbtable']

#print(dbtable)

# database connection 
def dbConnection(spark):

    pushdown_query = "(SELECT * FROM main_table) main_tbl"
    prprDF = spark.read.format("jdbc")\
        .option("url",url)\
        .option("user",dbUsr)\
        .option("dbtable",pushdown_query)\
        .option("password",dbPwd)\
        .option("driver",dbDrvr)\
        .option("numPartitions", 2)\
        .load()



    prprDF.write.format("jdbc")\
        .option("url",url)\
        .option("user",dbUsr)\
        .option("dbtable","backup_tbl")\
        .option("password",dbPwd)\
        .option("driver",dbDrvr)\
        .mode("overwrite").save()


if __name__ =="__main__":
    
    spark = SparkSession\
            .builder\
            .appName("DB refresh")\
            .getOrCreate()

    dbConnection(spark)
    spark.stop()

1 Ответ

0 голосов
/ 04 августа 2020

Похоже, вы используете только один поток (исполнитель) для обработки данных с помощью соединения JDB C. Можете ли вы проверить исполнителей и сведения о драйверах в Spark UI и попробовать увеличить ресурсы. Также поделитесь ошибкой, из-за которой происходит сбой. Вы можете получить это из того же пользовательского интерфейса или использовать CLI для регистрации «yarn logs -applicationId»

...