Добавить новую колонку в таблицу целей в AWS Glue-ETL - PullRequest
0 голосов
/ 06 мая 2019

Я новичок в AWS Glue ETL.Я пытаюсь выполнить простой расчет и добавить производный столбец в список целевой таблицы.Когда я запрашиваю, я вижу данные, но я изо всех сил пытаюсь добавить их в свой окончательный набор данных.Пожалуйста, помогите мне по тому же самому в ближайшее время.Спасибо

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "stg", table_name = "xyz", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "stg", table_name = "wind_gust", transformation_ctx = "datasource0")
## ==== Transformation ======
datasource0.toDF().createOrReplaceTempView("view_dyf")
sqlDF = spark.sql("select * from view_dyf").show()
## convert units from EU  to US units
us_unit_conv =spark.sql("""SELECT IF (mesurement_type = 'm s-1', round(units * 1.151,2),
                    IF (mesurement_type = 'm', round(units / 1609.344,2),
                      IF (mesurement_type = 'Pa', round(units /6894.757,2),0) )
                      )as new_unit
            from view_dyf""")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "Time", "string"), ("latitude", "double", "Latitude", "double"), ("longitude", "double", "Longitude", "double"), ("units", "double", "EU_Units", "double"), ("mesurement_type", "string", "EU_Unit_Type", "string"), ("variable_name", "string", "Variable_Name", "string")], transformation_ctx = "applymapping1")

Я добавил новый производный столбец как - ("us_unit_conv", "double", "US_Units", "double").Пожалуйста, обратитесь ниже

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "Time", "string"), ("latitude", "double", "Latitude", "double"), ("longitude", "double", "Longitude", "double"), ("units", "double", "EU_Units", "double"), ("mesurement_type", "string", "EU_Unit_Type", "string"), ("us_unit_conv", "double", "US_Units", "double"), ("variable_name", "string", "Variable_Name", "string")], transformation_ctx = "applymapping1")

1 Ответ

0 голосов
/ 07 мая 2019

Я думаю, вам нужно прочитать немного больше о применении сопоставления: ссылка.

  1. Вы указываете неправильный кадр, вы указываете datasource0, но это должен быть ваш новый кадр us_unit_conv,Так как это созданный вами кадр, который содержит вашу новую переменную.
  2. Отображение также немного неверно.("us_unit_conv", "double", "US_Units", "double"), это должно быть ("input_name", "input_type", "output_name", "output_type").Так что в вашем случае, я думаю, это будет ("new_unit", "double", "US_Units", "double").Но вам также необходимо передать остальную часть переменной с помощью SELECT *.
s_unit_conv =spark.sql("""SELECT *,IF (mesurement_type = 'm s-1', round(units * 1.151,2),
                    IF (mesurement_type = 'm', round(units / 1609.344,2),
                      IF (mesurement_type = 'Pa', round(units /6894.757,2),0) )
                      )as new_unit
            from view_dyf""")

applymapping1 = ApplyMapping.apply(frame = s_unit_conv, mappings = [("new_unit", "double", "US_Units", "double"),("time", "string", "Time", "string"), ("latitude", "double", "Latitude", "double"), ("longitude", "double", "Longitude", "double"), ("units", "double", "EU_Units", "double"), ("mesurement_type", "string", "EU_Unit_Type", "string"), ("variable_name", "string", "Variable_Name", "string")], transformation_ctx = "applymapping1")
...