Как запустить SQL SELECT на AWS Glue, созданном в DataFrame в Spark? - PullRequest
0 голосов
/ 21 мая 2019

У меня есть следующая работа в AWS Glue, которая в основном считывает данные из одной таблицы и извлекает их в виде csv-файла в S3, однако я хочу выполнить запрос к этой таблице (A Select, SUM и GROUPBY) и хочу получить что выводить в CSV, как мне это сделать в AWS Glue? Я новичок в Spark, поэтому, пожалуйста, помогите

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx = 
"datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id", 
"int")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = 
applymapping1, connection_type = "s3", connection_options = {"path": 
"s3://bucket"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

1 Ответ

1 голос
/ 21 мая 2019

Функция «create_dynamic_frame.from_catalog» связующего контекста создает динамический фрейм, а не фрейм данных. А динамический фрейм не поддерживает выполнение SQL-запросов.

Для выполнения SQL-запросов вам сначала потребуется преобразовать динамический фрейм в информационный фрейм, зарегистрировать временную таблицу в памяти spark, а затем выполнить SQL-запрос для этой временной таблицы.

Код образца:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext

glueContext = GlueContext(SparkContext.getOrCreate())
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)

DyF = glueContext.create_dynamic_frame.from_catalog(database="{{database}}", table_name="{{table_name}}")
df = DyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql('{{your select query with table name that you used for temp table above}}
df.write.format('{{orc/parquet/whatever}}').partitionBy("{{columns}}").save('path to s3 location')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...