Сохранить схему информационного кадра в местоположении S3 - PullRequest
2 голосов
/ 21 июня 2019

Я читаю в файле паркета из S3 в блоках данных, используя следующую команду

df = sqlContext.read.parquet('s3://path/to/parquet/file')

Я хочу прочитать схему кадра данных, которую я могу сделать с помощью следующей команды:

df_schema = df.schema.json()

Но я не могу записать объект df_schama в файл на S3.Примечание: я открыт для того, чтобы не создавать файл json.Я просто хочу сохранить схему данных в любом типе файла (возможно, в текстовом файле) в AWS S3.

Я попытался написать схему json следующим образом:

df_schema.write.csv("s3://path/to/file")

или

a.write.format('json').save('s3://path/to/file')

Оба они дают мне следующие ошибки:

AttributeError: 'str' object has no attribute 'write'

Ответы [ 2 ]

0 голосов
/ 21 июня 2019

Вот рабочий пример сохранения схемы и применения ее к новым данным CSV:

# funcs
from pyspark.sql.functions import *
from pyspark.sql.types import *

# example old df schema w/ long datatype
df = spark.range(10)
df.printSchema()
df.write.mode("overwrite").csv("old_schema")

root
 |-- id: long (nullable = false)

# example new df schema we will save w/ int datatype
df = df.select(col("id").cast("int"))
df.printSchema()

root
 |-- id: integer (nullable = false)

# get schema as json object
schema = df.schema.json()

# write/read schema to s3 as .txt
import json

with open('s3:/path/to/schema.txt', 'w') as F:  
    json.dump(schema, F)

with open('s3:/path/to/schema.txt', 'r') as F:  
    saved_schema = json.load(F)

# saved schema
saved_schema
'{"fields":[{"metadata":{},"name":"id","nullable":false,"type":"integer"}],"type":"struct"}'

# construct saved schema object
new_schema = StructType.fromJson(json.loads(saved_schema))

new_schema
StructType(List(StructField(id,IntegerType,false)))

# use saved schema to read csv files ... new df has int datatype and not long
new_df = spark.read.csv("old_schema", schema=new_schema)
new_df.printSchema()
root
 |-- id: integer (nullable = true)

0 голосов
/ 21 июня 2019

df.schema.json() результаты string объект и string объекты не будут иметь .write метод.

In RDD Api:

df_schema = df.schema.json()

распараллелить df_schema переменную для создания rdd и затем использовать метод .saveAsTextFile для записи схемы в s3.

sc.parallelize([df_schema]).saveAsTextFile("s3://path/to/file")

(или)

In Dataframe Api:

from pyspark.sql import Row
df_schema = df.schema.json()
df_sch=sc.parallelize([Row(schema=df1)]).toDF()
df_sch.write.csv("s3://path/to/file")
df_sch.write.text("s3://path/to/file") //write as textfile
...