Я использую Spark SQL в скрипте AWS Glue для преобразования некоторых данных в S3.Вот логика скрипта
Формат данных CSV
Язык программирования: Python
1) Извлечь данные из S3, используя каталог Glue, в DynamicDataFrame
2 Glue) Извлеките фрейм данных Spark из фрейма данных Glue с помощью toDF ()
3) Создайте Spark SQL-таблицу фрейма данных Spark
createOrReplaceTempView()
4) Используйте SQL-запрос для преобразования (вот гдеУ меня возникли проблемы)
5) Преобразовать окончательный фрейм данных в Glue Dynamic Data Frame
6) Сохранить окончательный фрейм данных в S3, используя glueContext.write_dynamic_frame.from_options()
Проблема
Когда я использую сравнение в SQL, например WHERE> или (case when <some_columns> > <some int> then 1 else 0 end) as <some_newcol>
, я получаю следующую ошибку
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` >
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` >
100000)' (struct<int:int,string:string> and int).; line 1 pos 35;\n'Project
['demand_amt]\n+- 'Filter (cxvalue#4 > 100000)\n +- SubqueryAlias sales\n +-
LogicalRDD [sales_id#0, customer_name#1, customer_loc#2, demand_amt#3L,
cxvalue#4]\n"
pyspark.sql.utils.AnalysisException: u"cannot resolve '(sales.`cxvalue` =
100000)' due to data type mismatch: differing types in '(sales.`cxvalue` =
100000)' (struct<int:int,string:string> and int).; line 1 pos 33;\n'Project
[customer_name#1, CASE WHEN (cxvalue#4 = 100000) THEN demand_amt#3 ELSE 0 END AS
small#12, CASE WHEN cxvalue#4 IN (200000,300000,400000) THEN demand_amt#3 ELSE 0
END AS medium#13]\n+- SubqueryAlias sales\n +- LogicalRDD [sales_id#0,
customer_name#1, customer_loc#2, demand_amt#3, cxvalue#4]\n"
Это говорит мне, что рассматривает столбцы какчисловые и строковые, это относится только к Spark, а не к AWS.SUM () GROUP BY работает нормально только сравнение
Я попробовал следующие шаги
1) Попытался изменить тип столбца с помощью метода Spark - Не удалось
df=df.withColumn(<column> df[<columns>].cast(DoubleType())) # df is Spark Data
111
Клейне позволяет изменять тип данных типа столбца фрейма искровых данных
2) Используется метод resoveChoice Glue, как описано в https://github.com/aws-samples/aws-gluesamples/ blob / master / examples / resol_choice.md.Метод resolChoice сработал, но sql не удалось с той же ошибкой
3) Использовано cast(<columns> as <data_type>)
в запросе SQL - Сбой
4) Ускорение Spark Cluster в моем облаке Google (просто чтобы ничего не гарантировать AWSсвязанные с).Использовать Spark только с такой же логикой - Сбой с той же ошибкой
5) На том же кластере Spark и в том же наборе данных использовалась та же логика, но принудительная схема с использованием StructType
и StructField
при создании новых данных Sparkframe - Passed
Вот примерные данные
+--------+-------------+------------+----------+-------+
|sales_id|customer_name|customer_loc|demand_amt|cxvalue|
+--------+-------------+------------+----------+-------+
| 1| ABC| Denver CO| 1200| 300000|
| 2| BCD| Boston MA| 212| 120000|
| 3| CDE| Phoenix AZ| 332| 100000|
| 4| BCD| Boston MA| 211| 120000|
| 5| DEF| Portland OR| 2121|1000000|
| 6| CDE| Phoenix AZ| 32| 100000|
| 7| ABC| Denver CO| 3227| 300000|
| 8| DEF| Portland OR| 2121|1000000|
| 9| BCD| Boston MA| 21| 120000|
| 10| ABC| Denver CO| 1200|300000 |
+--------+-------------+------------+----------+-------+
Это примеры кода и запросов, когда что-то не получается
sdf_sales.createOrReplaceTempView("sales")
tbl1="sales"
sql2="""select customer_name, (case when cxvalue < 100000 then 1 else 0) as small,
(case when cxvalue in (200000, 300000, 400000 ) then demand_amt else 0 end) as medium
from {0}
""".format(tbl1)
sql4="select demand_amt from {0} where cxvalue>100000".format(tbl1)
Однако эти запросы прекрасно работают с успешным GlueРабота
sql3="""select customer_name, sum(demand_amt) as total_spent from {0} GROUP BY customer_name""".format(tbl1)
Задача: Хотелось бы, чтобы клей как-то позволил мне изменить схему Spark Dataframe.Любое предложение будет оценено.