Почему pyspark sql запрос против S3 возвращает нули - PullRequest
0 голосов
/ 19 января 2019

Я получаю разные результаты при выполнении одного и того же запроса в Афине для источника S3 по сравнению с выполнением его из скрипта pyspark в кластере EMR (1 x 10). Я получаю данные от Афины, но все, что я получаю, это нулевые значения со сценарием. Любые предложения / мысли / догадки о том, почему?

Вот запрос Афины:

SELECT <real_col1> as reg_plate, <real_col1> as model_num
FROM <my Athena table name> 
WHERE partition_datetime LIKE '2019-01-01-14' 
limit 10

Что возвращает этот результат:

reg_plate   model_num
   515355  961-824
   515355  961-824
   515355  961-824
   515355  961-824
   341243  047-891
   727027  860-403
   619656  948-977
   576345  951-657
   576345  951-657
   113721  034-035

Однако, когда я запускаю этот запрос как скрипт, для того же источника S3 с:

# Define SQL query
load_qry = """SELECT <real_col1> as reg_plate, <real_col2> as model_num
FROM s3_table
WHERE partition_datetime LIKE '2019-01-01-14'
limit 10  """

df1 = spark.read.parquet("<s3:path to my data>")
df1.createOrReplaceTempView("s3_table")

sqlDF = spark.sql(load_qry)
sqlDF.show(10)

Я получаю только нули, вот так

+---------+---------+
|reg_plate|model_num|
+---------+---------+
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
|     null|     null|
+---------+---------+

Вот конфигурация в моем кластере, которая состоит из 1 главного сервера r3.xlarge и 10 рабочих r3.xlarge: Cluster config

Вот командная строка, которую я использую для запуска задания спарка: PYSPARK_PYTHON=/opt/miniconda/bin/python nohup spark-submit --driver- memory 8g --executor-memory 30g --conf spark.executor.cores=8 --conf spark.driver.maxResultSize=8g --conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER --conf spark.debug.maxToStringFields=100 --conf spark.sql.hive.convertMetastoreParquet=false stk_overflow.py > stk_oflow0120.txt 2>&1

1 Ответ

0 голосов
/ 29 января 2019

Я нашел простое решение.

Вместо

load_qry = """SELECT <real_col1> as reg_plate, <real_col2> as model_num 
FROM s3_table WHERE partition_datetime LIKE '2019-01-01-14' limit 10 """ 
df1 = spark.read.parquet("<s3:path to my data>") 
df1.createOrReplaceTempView("s3_table") 

Я использовал

load_qry = """SELECT <real_col1> as reg_plate, <real_col2> as model_num
FROM <my_athena_db>.table WHERE partition_datetime LIKE '2019-01-01-14' 
df1 = spark.sql(load_qry)

Что работает, потому что Клей знает, как добраться до "my_athena_db.table"

...