Клей закладки AWS - PullRequest
       39

Клей закладки AWS

0 голосов
/ 11 декабря 2018

Как проверить, что мои закладки работают?Я обнаружил, что, когда я запускаю работу сразу после предыдущего завершения, это, похоже, все еще занимает много времени.Это почему?Я думал, что он не будет читать файлы, которые он уже обработал?Сценарий выглядит следующим образом:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://xxx-glue/testing-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")

if bool(inputGDF.toDF().head(1)):
    print("Writing ...")
    inputGDF.toDF() \
      .drop("createdat") \
      .drop("updatedat") \
      .write \
      .mode("append") \
      .partitionBy(["querydestinationplace", "querydatetime"]) \
      .parquet("s3://xxx-glue/testing-parquet")
else:
    print("Nothing to write ...")

job.commit()

import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='xxx-testing-partitioned')

Внешний вид выглядит следующим образом:

18/12/11 14:49:03 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:03 DEBUG Client: 
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
18/12/11 14:49:04 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:04 DEBUG Client: 
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
18/12/11 14:49:05 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:05 DEBUG Client: 
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
...

18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-15_2018-11-19.csv:0+1194081
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-14_2018-11-18.csv' for reading
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-15_2018-11-19.csv' for reading
18/12/11 14:42:00 INFO Executor: Finished task 89.0 in stage 0.0 (TID 89). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 92
18/12/11 14:42:00 INFO Executor: Running task 92.0 in stage 0.0 (TID 92)
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-16_2018-11-20.csv:0+1137753
18/12/11 14:42:00 INFO Executor: Finished task 88.0 in stage 0.0 (TID 88). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 93
18/12/11 14:42:00 INFO Executor: Running task 93.0 in stage 0.0 (TID 93)
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-17_2018-11-21.csv:0+1346626
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-16_2018-11-20.csv' for reading
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-17_2018-11-21.csv' for reading
18/12/11 14:42:00 INFO Executor: Finished task 90.0 in stage 0.0 (TID 90). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO Executor: Finished task 91.0 in stage 0.0 (TID 91). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 94
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 95
18/12/11 14:42:00 INFO Executor: Running task 95.0 in stage 0.0 (TID 95)
18/12/11 14:42:00 INFO Executor: Running task 94.0 in stage 0.0 (TID 94)

... Я замечаю, что к паркету добавлено много повторяющихся данных ... Является ли закладка нетза работой?Уже включен

Ответы [ 5 ]

0 голосов
/ 13 августа 2019

Требования к закладкам

С документы

  1. Задание должно быть создано с помощью --job-bookmark-option job-bookmark-enable (или, если используется консоль, тов настройках консоли).Работа должна также иметь имя работы;это будет передано автоматически.

  2. Задание должно начинаться с Job.Init(jobname) например

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

Задание должно иметь Job.Commit() для сохранения состояния закладки и успешного завершения.

Источником данных должен быть либо источник s3, либо JDBC (ограниченный, а не ваш вариант использования, поэтому я его проигнорирую).

Пример в документации показывает создание динамического кадра из каталога (формирование клея / озера) с использованием имени таблицы, а не явного пути S3.Это подразумевает, что чтение из каталога все еще считается источником S3;базовые файлы будут на S3.

Файлы на s3 должны быть одним из JSON, CSV, Apache Avro, XML для версии 0.9 и выше или могут быть Parquet или ORCдля версии 1.0 и выше

Источник данных в сценарии должен иметь параметр transformation_ctx.
В документах указано

передавайте параметр translation_ctx только тем методам, для которых вы хотите включить закладки. Вы можете добавить это к каждому преобразованию для сохранения состояния, но критическим (ыми) является источник (и) данных, который вы хотите добавить в закладки.

Устранение неполадок

Начиная с документы

  • Максимальный параллелизм должен быть равен 1. Чем выше значения, тем больше закладок
  • Также упоминается job.comit() и использование transformation_ctx, как указано выше

Для источников ввода Amazon S3 в закладках заданий проверяется последнее изменение время объектов, а не имен файлов, чтобы проверить, какие объекты необходимо повторно обработать.Если ваши исходные данные были изменены с момента последнего запуска задания, файлы будут повторно обработаны при следующем запуске задания.

Другие вещи, которые нужно проверить

  • Вы убедились, что ваши CSV-файлы в пути "s3://xxx-glue/testing-csv" еще не содержат дубликатов?Вы можете использовать сканер клея или написать DDL в Афине, чтобы создать таблицу поверх них и посмотреть прямо.В качестве альтернативы создайте конечную точку разработчика, запустите блокнот zeppelin или sagemaker и пошагово изучите ваш код.

  • Нигде не упоминается, что редактирование вашего сценария сбросит ваше состояние, однако, если вы изменилиtransformation_ctx источника данных или других этапов, которые могут повлиять на состояние, однако я этого не проверял.Задание имеет Jobname, которое задает состояние, а также номер запуска, номер попытки и номер версии, которые используются для управления повторными попытками, и последнее состояние, что означает, что незначительные изменения в сценарии не влияют на состояние до тех пор, покапоскольку Jobname соответствует, но опять же я этого не проверял.

  • Кроме того, в своем коде вы проверяете на inputGDF.toDF().head(1), а затем запускаете inputGDF.toDF()... для записи данных.Spark лениво оценивается, но в этом случае вы дважды запускаете эквивалентный динамический кадр для информационного кадра, и Spark не может его кэшировать или использовать повторно.Лучше сделать что-то вроде df = inputGDF.toDF() перед if, а затем дважды использовать df.

0 голосов
/ 26 марта 2019

Закладки не поддерживаются для формата паркета: https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

0 голосов
/ 19 декабря 2018

Пожалуйста, проверьте этот документ о механизме закладок AWS Glue.

В основном требуется включить его через консоль (или CloudFormation) и указать параметр tranformation_context, который используется вместе с некоторыми другими атрибутами(например, имя задания, имена исходного файла) для сохранения информации о контрольных точках.Если вы измените значение одного из этих атрибутов, то Glue будет рассматривать его как другую контрольную точку.

0 голосов
/ 26 декабря 2018

https://docs.aws.amazon.com/glue/latest/dg/monitor-debug-multiple.html можно использовать для проверки работоспособности закладки или нет

0 голосов
/ 13 декабря 2018

Только для записи, а так как ответов пока нет.

Я думаю, что редактирование сценария, кажется, влияет на закладки ... но я думал, что это не должно ...

...