Загрузить таблицу spark sql в таблицу базы данных - PullRequest
0 голосов
/ 08 января 2020

Можно ли как-нибудь загрузить таблицу spark sql в таблицу базы данных, как мы это делали в sql

insert into database_table select * from sparksql_table.

pg_hook = PostgresHook(postgres_conn_id="ingestion_db", schema="ingestiondb")

connection = pg_hook.get_conn()

cursor = connection.cursor()

spark = SparkSession \
    .builder \
    .appName("Spark csv schema inference") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()\

Я могу запустить это:

spark. sql ("select * from MetadataTable"). Show ()

, но не это:

cursor.execute ("select * из таблицы метаданных ")

Ответы [ 2 ]

0 голосов
/ 08 января 2020

Вы можете найти этот python эквивалентный ответу @ Ghost9:

Инициализируйте сеанс искры с помощью пакета драйверов postgres, как показано ниже (Проверьте правильную версию) :

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars.packages", "postgresql-42.2.5.jar") \
    .getOrCreate()

Затем вы можете подключиться к jdb c с помощью следующей функции:

def connect_to_sql(
        spark, df, jdbc_hostname, jdbc_port, database, data_table, username, password
):
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "org.postgresql.Driver",
    }

    df.write.jdbc(url=jdbc_url, table=data_table, mode="append", properties=connection_details)

Вы можете найти различные режимы:

    append: Append contents of this :class:DataFrame to existing data.
    overwrite: Overwrite existing data.
    ignore: Silently ignore this operation if data already exists.
    error (default case): Throw an exception if data already exists.
0 голосов
/ 08 января 2020

Открыть спарк-оболочку с этим пакетом спарк-оболочка --packages org. postgresql: postgresql: 42.1.1

val url = "jdbc:postgresql://localhost:5432/dbname"

 def getProperties: Properties ={
 val prop = new Properties
 prop.setProperty("user", "dbuser")
 prop.setProperty("password", "dbpassword")
 prop.setProperty("driver", "org.postgresql.Driver")
 prop
 }

val df = spark.sql("""select * from table  """)

df.write.mode("append").option("driver", "org.postgresql.Driver").jdbc(url, 
"tablename", getProperties)

После этого вы можете проверить таблицу в вашем postgres база данных. Кроме того, обратитесь к различным параметрам режима, которые предлагает искра, и выберите подходящий для вас.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...