Чтобы извлечь данные из базы данных RDBMS (postgres), работая с scala и spark, я написал следующий код:
class GpMeta(spark:SparkSession, source_system_name:String, conParams:ParametersDto) {
import spark.implicits._
try {
Class.forName(conParams.getGpDriverClass()).newInstance()
}
catch {
case cnf: ClassNotFoundException =>
println("No class def found for Hive Metastore. Exiting the application....")
System.exit(1)
case e: Exception =>
println("No class def found for Hive Metastore. Exiting the application....")
System.exit(1)
}
def getControlTableData(): Unit = {
val cdf = spark.read.format("jdbc").option("url", conParams.getGpConUrl()).option("dbtable", s"(select target_table, id from schema.table where source_system_name='${source_system_name}' and transfer_status is null and tenant_code='HDFS') as controlTableDF").option("user", conParams.getGpUserName()).option("password", conParams.getGpPwd()).load()
cdf.persist(MEMORY_ONLY)
cdf.createOrReplaceTempView("cdfTemp")
}
Это работает отлично. Я пытаюсь повторить то же самое в Python (сейчас изучаю Python), но не понимаю, как создать экземпляр драйвера внутри файла .py. Я написал этот фрагмент кода:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import psycopg2
def get_interface_log_ids(spark):
idf = spark.read.format("jdbc").option("url","").option("user","user").option("password","pwd")option("dbtable",f"select target_table from schema.table where source_system_name='SSS' and transfer_status is null and tenant_code='HDFS' order by interface_log_id desc) as controlTableDF").load()
idf.persist("MEMORY_ONLY")
return idf
if __name__ == "__main__":
sparkConf = SparkConf().setAppName("DummyAP")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
Class.forName("org.postgresql.Driver")
get_interface_log_ids(spark)
Если я попытаюсь указать экземпляр драйвера в коде, как показано ниже: Class.forName("org.postgresql.Driver")
Я вижу сообщение об ошибке: Неразрешенная ссылка «Класс». Это правильный способ зарегистрировать драйвер в Python коде? Также я вижу эту ссылку для postgres python jdb c соединения. Если я должен использовать код в этой ссылке, где я могу указать Python и Spark, что я использую Postgres драйвер?
Может кто-нибудь сообщить мне, как я могу исправить свою ошибку?