AWS Glue: Как добавить столбец с исходным именем файла в выводе? - PullRequest
0 голосов
/ 10 мая 2018

Кто-нибудь знает способ добавить исходное имя файла в виде столбца в задании Glue?

Мы создали поток, в котором мы сканировали некоторые файлы в S3 для создания схемы.Затем мы написали задание, которое преобразует файлы в новый формат, и записывает эти файлы обратно в другое хранилище S3 как CSV, чтобы использовать его в остальной части нашего конвейера.Мы хотели бы получить доступ к некоторым мета-свойствам задания, чтобы мы могли добавить в выходной файл новый столбец, содержащий исходное имя файла.

Я просмотрел документацию AWS и источник aws-glue-libs, но не увидел ничего, что выскочило.В идеале был бы какой-то способ получить метаданные из пакета awsglue.job (мы используем версию Python).

Я все еще изучаю Glue, поэтому извиняюсь, если использую неправильную терминологию.Я также пометил это тегом-искрой, потому что я считаю, что это то, что Клей использует под крышками.

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

С помощью автоматически сгенерированного скрипта AWS Glue Python я добавил следующие строки:

from pyspark.sql.functions import input_file_name

## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())

## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")

Затем в частях кода ApplyMapping или datasink вы ссылаетесь на datasource2.

0 голосов
/ 11 мая 2018

Вы можете сделать это с искрой в своей работе etl:

var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))

Помните, что он работает только с API getCatalogSource (), а не с create_dynamic_frame_from_options ()

...