В основном я передаю динамические значения в pyspark SQL. Мой код подробно описан ниже:
set_sql = "".join(["set app_list_0 = 'app_3'"])
sqlContext.sql(set_sql)
click_app_join_sql = sqlContext.sql("select click_id, (case when app_new in ${app_list_0} then 1 else 0 END) as ${app_list_0}, device, os, channel from clickDF ")
click_app = sqlContext.sql(click_app_join_sql)
click_app.show(3)
Когда я запускаю свой код, я получаю следующую ошибку. Подскажите, пожалуйста, что не так в приведенном выше коде.
File "/home/saureddi/spark/data_process/hive_data_process.py", line 103, in <module>
click_app_join_sql = sqlContext.sql("select click_id, (case when app_new in ${app_list_0} then 1 else 0 END) as ${app_list_0}, device, os, channel from clickDF ")
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", line 384, in sql
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 603, in sql
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/2.6.4.0-91/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco
pyspark.sql.utils.ParseException: u"\nno viable alternative at input '(case when app_new in 'app_3''(line 1, pos 39)\n\n== SQL ==\nselect click_id, (case when app_new in 'app_3' then 1 else 0 END) as 'app_3', device, os, channel from clickDF \n---------------------------------------^^^\n"
Код:
app_sql = "".join(["select a.app, CONCAT('app', '_',a.app) as new_app, a.app_count from(select app, count(app) as app_count from dblclk_text.click_data group by app )a order by a.app_count desc limit 5"])
df1 = hiveContext.sql(app_sql)
df1.createOrReplaceTempView('app')
df1_new_app = sqlContext.sql("select new_app from app ")
df1_new_app.printSchema()
app_list = []
app_result = df1_new_app.collect()
print(type(app_result))
for app in app_result:
app_list.append(app)
click_sql = "select click_id,CONCAT ('app', '_', app) as app_new, device, os, channel from dblclk_text.click_pank_data"
clickDF = hiveContext.sql(click_sql)
clickDF.createOrReplaceTempView('clickDF')
app_list_0 = str(app_list[0])
#app_list_0 = 'app_3'
print (app_list_0)
sample_sql = '''select click_id, (case when app_new in {0} then 1 else 0 END) as {0}, device, os, channel from clickDF '''.format(app_list_0)
click_app_join_sql = sqlContext.sql(sample_sql)
click_app = sqlContext.sql(click_app_join_sql)
click_app.show(3)