Pyspark: чтение данных из таблицы и запись в файл - PullRequest
0 голосов
/ 24 апреля 2020

Я использую искровой кластер HDInsight для запуска кода Pyspark. Я пытаюсь прочитать данные из таблицы postgres и записать в файл, как показано ниже. pgsql_df возвращает DataFrameReader вместо DataFrame. Поэтому я не могу записать DataFrame в файл. Почему «spark.read» возвращает DataFrameReader. Что мне здесь не хватает?

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as dbpull
from datetime import datetime
from pyspark.sql.types import Row
from pyspark.sql import DataFrame
from pyspark.sql import DataFrameReader
from pyspark.sql import DataFrameWriter
import random
import string
from pyspark.sql.functions import *
import sys
spark=SparkSession.builder.master("local").appName("db pull").getOrCreate()
pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")```

>>>pgsql_df
<pyspark.sql.readwriter.DataFrameReader object at 0x7fb43ce1f890>


pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)


**Error:** 
 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'write'





1 Ответ

0 голосов
/ 24 апреля 2020

Пожалуйста, проверьте ниже код. Вы не можете вызвать load () для объекта DataFrameReader.

pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")
    .load() // this is missing 

pgsql_df.write.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

or 


pgsql_df=spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql://<hostdetails>") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password")

pgsql_df
.load() \ added here 
.write. \
.format("csv").mode("overwrite").options(sep=",", header="true").save(path=output)

...