Полагаю, вам не нужны имена таблиц, которые относятся к внутренней работе postgres, такие как pg_type
, pg_policies
и т. Д., Схема которых имеет тип pg_catalog
, что вызывает ошибку
py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o34.jdbc.: java.sql.SQLException: неподдерживаемый тип ARRAY
, когда вы пытаетесь прочитать их как
spark.read.jdbc(url=jdbcUrl, table='pg_type', properties=connectionProperties)
, и существуют таблицы, такие как applicable_roles
, view_table_usage
и т. д., чьиСхема имеет тип information_schema
, который вызывает
py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o34.jdbc.: org.postgresql.util.PSQLException: ОШИБКА: отношение "view_table_usage" не существует
при попытке прочитать их как
spark.read.jdbc(url=jdbcUrl, table='view_table_usage', properties=connectionProperties)
Таблицы, типы схем которых являются общедоступнымиможет быть прочитан в таблицы с помощью вышеуказанных команд jdbc.
Я проверил тип table_name и это String, это правильный подход?
Так что вам нужно отфильтровать эти имена таблиц и применять вашу логику как
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Connect to DB") \
.getOrCreate()
jdbcUrl = "jdbc:postgresql://hostname:post/"
connectionProperties = {
"user" : " ",
"password" : " ",
"driver" : "org.postgresql.Driver"
}
query = "information_schema.tables"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
table_name_list = df.filter((df["table_schema"] != 'pg_catalog') & (df["table_schema"] != 'information_schema')).select("table_name").rdd.flatMap(lambda x: x).collect()
for table_name in table_name_list:
df2 = spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)
Это должно работать