Неподдерживаемая ошибка массива при чтении источника JDBC в (Py) Spark? - PullRequest
0 голосов
/ 31 мая 2018

Попытка преобразовать базу данных postgreSQL в Dataframe.Вот мой код:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Connect to DB") \
    .getOrCreate()

jdbcUrl = "jdbc:postgresql://XXXXXX" 
connectionProperties = {
  "user" : " ",
  "password" : " ",
  "driver" : "org.postgresql.Driver"
}

query = "(SELECT table_name FROM information_schema.tables) XXX"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)

table_name_list = df.select("table_name").rdd.flatMap(lambda x: x).collect() 
    for table_name in table_name_list:
          df2 = spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)

Я получаю ошибку:

java.sql.SQLException: неподдерживаемый тип ARRAY при генерации df2 для имени таблицы

Если я жестко кодирую значение имени таблицы, я не получаю ту же ошибку

df2 = spark.read.jdbc(jdbcUrl,"conditions",properties=connectionProperties) 

Я проверил тип table_name, и это String, это правильный подход?

1 Ответ

0 голосов
/ 31 мая 2018

Полагаю, вам не нужны имена таблиц, которые относятся к внутренней работе postgres, такие как pg_type, pg_policies и т. Д., Схема которых имеет тип pg_catalog, что вызывает ошибку

py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o34.jdbc.: java.sql.SQLException: неподдерживаемый тип ARRAY

, когда вы пытаетесь прочитать их как

spark.read.jdbc(url=jdbcUrl, table='pg_type', properties=connectionProperties)

, и существуют таблицы, такие как applicable_roles, view_table_usage и т. д., чьиСхема имеет тип information_schema, который вызывает

py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o34.jdbc.: org.postgresql.util.PSQLException: ОШИБКА: отношение "view_table_usage" не существует

при попытке прочитать их как

spark.read.jdbc(url=jdbcUrl, table='view_table_usage', properties=connectionProperties)

Таблицы, типы схем которых являются общедоступнымиможет быть прочитан в таблицы с помощью вышеуказанных команд jdbc.

Я проверил тип table_name и это String, это правильный подход?

Так что вам нужно отфильтровать эти имена таблиц и применять вашу логику как

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Connect to DB") \
    .getOrCreate()

jdbcUrl = "jdbc:postgresql://hostname:post/" 
connectionProperties = {
  "user" : " ",
  "password" : " ",
  "driver" : "org.postgresql.Driver"
}

query = "information_schema.tables"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)

table_name_list = df.filter((df["table_schema"] != 'pg_catalog') & (df["table_schema"] != 'information_schema')).select("table_name").rdd.flatMap(lambda x: x).collect() 
    for table_name in table_name_list:
          df2 = spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)

Это должно работать

...