У меня есть данные CSV, которые сканируются через сканер клея и в итоге оказываются в одной таблице.
Я пытаюсь запустить задание ETL, чтобы разделить данные на диске на некоторые компоненты столбца даты. Затем преобразовать CSV в паркет.
т.е. У меня есть столбец с именем «date» в моих данных, и я хотел разделить данные на год, месяц, день на s3.
Я могу преобразовать в паркет и правильно разделить его по значению серийного номера (другой столбец), но он помещает значение "__HIVE_DEFAULT_PARTITION__" для всех значений год, месяц и день для связанной даты перегородки.
Я могу разбить на другие столбцы (например, серийный номер), но год / месяц / день не указаны в исходном наборе данных, и поэтому мой подход заключается в создании значений из столбца даты в качестве новых столбцов в наборе данных и скажите функции write_dynamic_frame разделить по столбцам, но это не работает.
Я новичок в Spark / Pyspark и клее в целом, так что вполне возможно, что я упускаю что-то простое.
Спасибо всем, кто предлагает помощь.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
to_spark_df4 = dropnullfields3.toDF()
with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))
back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
В результате мои ключи в s3 выглядят так:
serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet
Обновление: отредактировано для форматирования