Где мы регистрируем драйвер в программе PySpark? - PullRequest
0 голосов
/ 09 апреля 2020

Чтобы извлечь данные из базы данных RDBMS (postgres), работая с scala и spark, я написал следующий код:

class GpMeta(spark:SparkSession, source_system_name:String, conParams:ParametersDto) {
    import spark.implicits._
    try {
        Class.forName(conParams.getGpDriverClass()).newInstance()
    }
    catch {
        case cnf: ClassNotFoundException =>
            println("No class def found for Hive Metastore. Exiting the application....")
            System.exit(1)
        case e: Exception =>
            println("No class def found for Hive Metastore. Exiting the application....")
            System.exit(1)
    }
    def getControlTableData(): Unit = {
        val cdf = spark.read.format("jdbc").option("url", conParams.getGpConUrl()).option("dbtable", s"(select target_table, id from schema.table where source_system_name='${source_system_name}' and transfer_status is null and tenant_code='HDFS') as controlTableDF").option("user", conParams.getGpUserName()).option("password", conParams.getGpPwd()).load()
        cdf.persist(MEMORY_ONLY)
        cdf.createOrReplaceTempView("cdfTemp")
}

Это работает отлично. Я пытаюсь повторить то же самое в Python (сейчас изучаю Python), но не понимаю, как создать экземпляр драйвера внутри файла .py. Я написал этот фрагмент кода:

from pyspark import SparkConf
from pyspark.sql import SparkSession
import psycopg2 

def get_interface_log_ids(spark):
    idf = spark.read.format("jdbc").option("url","").option("user","user").option("password","pwd")option("dbtable",f"select target_table from schema.table where source_system_name='SSS' and transfer_status is null and tenant_code='HDFS' order by interface_log_id desc) as controlTableDF").load()
    idf.persist("MEMORY_ONLY")
    return idf


if __name__ == "__main__":
    sparkConf = SparkConf().setAppName("DummyAP")
    spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
    Class.forName("org.postgresql.Driver")
    get_interface_log_ids(spark)

Если я попытаюсь указать экземпляр драйвера в коде, как показано ниже: Class.forName("org.postgresql.Driver") Я вижу сообщение об ошибке: Неразрешенная ссылка «Класс». Это правильный способ зарегистрировать драйвер в Python коде? Также я вижу эту ссылку для postgres python jdb c соединения. Если я должен использовать код в этой ссылке, где я могу указать Python и Spark, что я использую Postgres драйвер?

Может кто-нибудь сообщить мне, как я могу исправить свою ошибку?

Ответы [ 2 ]

1 голос
/ 10 апреля 2020

Обновить мой предыдущий ответ. Необходимо два свойства:

"spark.jars" и "spark.driver.extraClassPath",

So; код должен выглядеть так:

conf = SparkConf()
    .setAppName("PostGreSQL")
    .setMaster("local[*]")
    .set("spark.jars", "path/to/jar/postgresql-9.1-901-1.jdbc4.jar")
    .set("spark.driver.extraClassPath", "path/to/jar/lib/postgresql-9.1-901-1.jdbc4.jar") 

spark = SparkSession
    .builder
    .config(conf=conf)
    .getOrCreate()

jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://*.*.*.*/")
    .option("dbtable", '"Test"."Table1"')
    .option("user", "postgres")
    .option("password", "******")
    .load()
jdbcDF.registerTempTable("PostGreSQL")
spark.sql(""" SELECT * FROM PostgreSQL """).show()
1 голос
/ 09 апреля 2020

Присвойте 'spark conf' свойство с именем "spark.jars" и значение абсолютного пути кувшина, например:

    SparkConf() 
       .setAppName("DummyAP")
       .set("spark.jars", "/where/your/driver/located/postgresql-*-*-*.jdbc4.jar") 

Также; Вы можете проверить официальные документы здесь , что может быть полезно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...