Spark SQL на AWS Glue: pyspark.sql.utils.AnalysisException - PullRequest
0 голосов
/ 17 апреля 2019

Я использую Spark SQL в скрипте AWS Glue для преобразования некоторых данных в S3.Вот логика скрипта

Формат данных CSV

Язык программирования: Python

1) Извлечь данные из S3, используя каталог Glue, в DynamicDataFrame

2 Glue) Извлеките фрейм данных Spark из фрейма данных Glue с помощью toDF ()

3) Создайте Spark SQL-таблицу фрейма данных Spark

createOrReplaceTempView() 

4) Используйте SQL-запрос для преобразования (вот гдеУ меня возникли проблемы)

5) Преобразовать окончательный фрейм данных в Glue Dynamic Data Frame

6) Сохранить окончательный фрейм данных в S3, используя glueContext.write_dynamic_frame.from_options()

Проблема

Когда я использую сравнение в SQL, например WHERE> или (case when <some_columns> > <some int> then 1 else 0 end) as <some_newcol>

, я получаю следующую ошибку

pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` >
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` >
100000)' (struct<int:int,string:string> and int).; line 1 pos 35;\n'Project
['demand_amt]\n+- 'Filter (cxvalue#4 > 100000)\n +- SubqueryAlias sales\n +-
LogicalRDD [sales_id#0, customer_name#1, customer_loc#2, demand_amt#3L,
cxvalue#4]\n"
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` =
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` =
100000)' (struct<int:int,string:string> and int).; line 1 pos 33;\n'Project
[customer_name#1, CASE WHEN (cxvalue#4 = 100000) THEN demand_amt#3 ELSE 0 END AS
small#12, CASE WHEN cxvalue#4 IN (200000,300000,400000) THEN demand_amt#3 ELSE 0
END AS medium#13]\n+- SubqueryAlias sales\n +- LogicalRDD [sales_id#0,
customer_name#1, customer_loc#2, demand_amt#3, cxvalue#4]\n"

Это говорит мне, что рассматривает столбцы какчисловые и строковые, это относится только к Spark, а не к AWS.SUM () GROUP BY работает нормально только сравнение

Я попробовал следующие шаги

1) Попытался изменить тип столбца с помощью метода Spark - Не удалось

df=df.withColumn(<column> df[<columns>].cast(DoubleType())) # df is Spark Data
111

Клейне позволяет изменять тип данных типа столбца фрейма искровых данных

2) Используется метод resoveChoice Glue, как описано в https://github.com/aws-samples/aws-gluesamples/ blob / master / examples / resol_choice.md.Метод resolChoice сработал, но sql не удалось с той же ошибкой

3) Использовано cast(<columns> as <data_type>) в запросе SQL - Сбой

4) Ускорение Spark Cluster в моем облаке Google (просто чтобы ничего не гарантировать AWSсвязанные с).Использовать Spark только с такой же логикой - Сбой с той же ошибкой

5) На том же кластере Spark и в том же наборе данных использовалась та же логика, но принудительная схема с использованием StructType и StructField при создании новых данных Sparkframe - Passed

Вот примерные данные

+--------+-------------+------------+----------+-------+
|sales_id|customer_name|customer_loc|demand_amt|cxvalue|
+--------+-------------+------------+----------+-------+
|       1|          ABC|   Denver CO|      1200| 300000|
|       2|          BCD|   Boston MA|       212| 120000|
|       3|          CDE|  Phoenix AZ|       332| 100000|
|       4|          BCD|   Boston MA|       211| 120000|
|       5|          DEF| Portland OR|      2121|1000000|
|       6|          CDE|  Phoenix AZ|        32| 100000|
|       7|          ABC|   Denver CO|      3227| 300000|
|       8|          DEF| Portland OR|      2121|1000000|
|       9|          BCD|   Boston MA|        21| 120000|
|      10|          ABC|   Denver CO|      1200|300000 |
+--------+-------------+------------+----------+-------+

Это примеры кода и запросов, когда что-то не получается

sdf_sales.createOrReplaceTempView("sales")

tbl1="sales"

sql2="""select customer_name, (case when cxvalue  < 100000 then 1 else 0)  as small,   
(case when cxvalue  in (200000, 300000, 400000 ) then demand_amt else 0 end) as medium  
from {0}   
""".format(tbl1)

sql4="select demand_amt from {0} where cxvalue>100000".format(tbl1)

Однако эти запросы прекрасно работают с успешным GlueРабота

sql3="""select customer_name, sum(demand_amt) as total_spent from {0} GROUP BY  customer_name""".format(tbl1)

Задача: Хотелось бы, чтобы клей как-то позволил мне изменить схему Spark Dataframe.Любое предложение будет оценено.

1 Ответ

0 голосов
/ 19 апреля 2019

AWS Glue resolveChoice исправил проблему. Ошибка логики программирования: обработан Spark Frame как изменяемый

...