Как избежать ошибки синтаксического анализа при записи файла orc, поскольку он использует pyspark? - PullRequest
0 голосов
/ 30 марта 2019

Я читаю два кадра данных в формате orc (файлы уже в формате orc). Затем я записываю в другой путь s3, первый кадр данных, в формате orc. Выдает ошибку исключения Parsing вместе с именами столбцов данных.

Я просмотрел некоторые вопросы, но у них был ответ, относящийся к улью, и ни на один из вопросов о чистом pyspark не было дано ответов.

from athena_processing import AthenaContext
from datetime import datetime
from awsglue.utils import getResolvedOptions
import sys

athena_context = AthenaContext()
# spark context is already created for you as athena_context.sc, spark session is already created for you as athena_context.spark
# include this line in your code: spark = athena_context.spark

spark = athena_context.spark

# get two input names


# get output name
#output_path = getResolvedOptions(sys.argv, ['output_path'])['output_path']

# df = athena_context.input('S3')
# df2 = athena_context.input('S3_1')
df = spark.read.orc('s3://my-bucket/pipelines/march302_AkdmcrCTp/manual__2019-03-29T20_19_22+00_00/aws-s3_8LI6nrCTp')
df2 = spark.read.orc('s3://my-bucket/pipelines/march302_AkdmcrCTp/manual__2019-03-29T20_19_22+00_00/aws-s3_-K5Rn9CHM')
# df = spark.read.csv(
#     's3://my-bucket/one_billion_hundred_columns.csv',
#     header=True
# )
# print("First dataframe read with headers set to True")

# df2 = athena_context.input('5c9e2249186c5d0006b36fae_manual__2019_03_29T13_52_15_00_00')
# df2 = spark.read.orc(
#     's3://my-bucket/opendata/gdelt/gdelt_100m'
# )

# df2 = spark.read.csv(
#     's3://my-bucket/one_billion_hundred_columns.csv',
#     header=True
# )

print("Second dataframe read with headers set to True")

# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns

# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])

# Perform join
# df3 = df.alias('l').join(df2.alias('r'), on='l.left_c_0' == 'r.right_c_0')

# df3 = df.alias('l').join(df2.alias('r'), on='c_0')

# df3 = df.join(
#     df2,
#     df["left_column_test_0"] == df2["right_column_test_0"]
# )

# df3 = df.join(
#     df2,
#     df["left__c0"] == df2["right__c0"]
# )

# df3 = df3.repartition("left_column_test_0")

print("Dataframes have been joined and the resultant dataframe has been partitioned successfully.")

# output_file_path = 's3://my-bucket/temp-de-avi/output_temp_de_avi/gdelt_100m/march_26_job/_1'
output_file_path = 's3://my-bucket/pipelines/march302_AkdmcrCTp/manual__2019-03-29T20_19_22+00_00/processing_1wuRc9CHp'
print(output_file_path)
df.write.orc(
    output_file_path
)


# print("Dataframe has been written to csv.")
athena_context.commit()

Я ожидаю успешной записи файла orc, но вместо этого получаю следующую ошибку от AWS Glue:

ParseException: u"\nextraneous input 'type' expecting ':'(line 1, pos 41)\n\n== SQL ==\nstruct<region:string,country:string,item type:string,sales channel:string,order priority:string,order date:timestamp,order id:int,ship date:timestamp,units sold:int,unit price:double,unit cost:double,total revenue:double,total cost:double,total profit:double>\n-----------------------------------------^^^\n"```
...