AWS Glue ETL и PySpark и многораздельные данные: как создать столбец данных из раздела - PullRequest
1 голос
/ 30 апреля 2019

У меня есть данные в корзине S3, содержащей много json-файлов, которые выглядят примерно так:

s3://bucket1/news/year=2018/month=01/day=01/hour=xx/

Раздел day содержит несколько hour=xx разделов, по одному на каждый час дня.Я запускаю задание Glue ETL для файлов в разделе day и создаю клей dynamic_frame_from_options.Затем я применяю отображение, используя ApplyMapping.apply, которое работает как шарм.

Однако затем я хотел бы создать новый столбец, содержащий значение hour, основанный на разделе каждого файла.Я могу использовать Spark для создания нового столбца с константой, однако как мне сделать этот столбец для использования раздела в качестве источника?

df1 = dynamicFrame.toDF().withColumn("update_date", lit("new column value"))

Edit1

В статье AWS о том, как использовать многораздельные данные, используется сканер Glue перед созданием dynamicFrame, а затем создание dynamicFrame из каталога Glue.Мне нужно создать dynamicFrame непосредственно из источника S3. введите описание ссылки здесь

Ответы [ 3 ]

3 голосов
/ 07 мая 2019

Я не совсем понимаю, что вам нужно делать.Разве у вас уже нет значения hour, если у вас есть файлы, разделенные на нем, или только при использовании create_dynamic_frame .from_catalog вы получите его?Вы можете сделать df1["hour"] или df1.select_fields["hour"]?

Вам не нужно импортировать какие-либо библиотеки, если ваши данные разделены на ts(timestamp in yyyymmddhh format), это вы можете выполнить с чистым python в Spark.

Пример кода.Сначала я создаю некоторые значения, которые будут заполнять мой DataFrame.Затем создайте новую переменную, как показано ниже.

df_values = [('2019010120',1),('2019010121',2),('2019010122',3),('2019010123',4)]
df = spark.createDataFrame(df_values,['yyyymmddhh','some_other_values'])
df_new = df.withColumn("hour", df["yyyymmddhh"][9:10])
df_new.show()
+----------+-----------------+----+
|yyyymmddhh|some_other_values|hour|
+----------+-----------------+----+
|2019010120|                1|  20|
|2019010121|                2|  21|
|2019010122|                3|  22|
|2019010123|                4|  23|
+----------+-----------------+----+
0 голосов
/ 11 мая 2019

Насколько я понимаю, ваша проблема заключается в том, что вы хотели бы построить фрейм данных для данного дня с часами в качестве разделов. Обычно, если вы используете разделенные пути в стиле Apache Hive и ваши файлы имеют одинаковую схему, у вас не должно возникнуть проблем с использованием

ds = glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': ['s3://bucket1/news/year=2018/month=01/day=01/']},
    'json')

или ...

df = spark.read.option("mergeSchema", "true").json('s3://bucket1/news/year=2018/month=01/day=01/')

Так что, если это не работает, вам следует проверить, используете ли вы разделенные пути в стиле Apache Hive, и ваши файлы имеют одинаковую схему.

Вы также можете попробовать использовать boto3 framework в Glue (это может быть полезно для вас):

import boto3
s3 = boto3.resource('s3')

Полезная ссылка:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html

0 голосов
/ 06 мая 2019

Я не знаком с AWS Glue, если данная ссылка не работает для вашего случая, тогда вы можете попробовать и посмотреть, подходит ли вам следующий обходной путь:

Получите имя файла, используя input_file_name , затем используйте regexp_extract, чтобы получить нужный столбец раздела из имени файла:

from pyspark.sql.functions import input_file_name, regexp_extract

df2 = df1.withColumn("hour", regexp_extract(input_file_name(), "hour=(.+?)/", 1))
...