Как предотвратить обработку файлов дважды с помощью Spark DataFrames - PullRequest
3 голосов
/ 21 марта 2019

Я обрабатываю паркет от S3 TSV до S3 с помощью клея AWS.Из-за входящих файлов, отличных от UTF-8, я вынужден использовать DataFrames вместо DynamicFrames для обработки моих данных (это известная проблема без обходных путей, что DynamicFrames полностью терпит неудачу с любыми не-UTF8 символами).Это также означает, что я не могу использовать Job Bookmarks в Glue, чтобы отслеживать, какие файлы S3 TSV я уже обработал.

Мой код выглядит так:

# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
from awsglue.dynamicframe import DynamicFrame

# @params: [JOB_NAME, s3target]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define massive list of fields in the schema
fields = [
    StructField("accept_language", StringType(), True),
    StructField("browser", LongType(), True),
    .... huge list ...
    StructField("yearly_visitor", ShortType(), True),
    StructField("zip", StringType(), True)
]

schema = StructType(fields)

# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(args['s3source'] + "/*.tsv.gz")

# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')

# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))

# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])

job.commit()

Мой вопрос- без закладок задания каждый раз при запуске он обрабатывает одни и те же файлы s3 снова и снова.Как я могу переместить обработанные файлы в исходной корзине s3 в подпапку или что-то в этом роде или иным образом избежать двойной обработки файлов?

Я не уверен, в чем тут хитрость, поскольку Spark является параллельной системой, идаже не зная, что это за файлы.Думаю, я мог бы создать второе задание Glue с типом задания Python Shell и сразу же удалить входящие файлы, но даже тогда я не уверен, какие файлы удалять и т. Д.

Спасибо,

Chris

Ответы [ 4 ]

1 голос
/ 22 апреля 2019

Чтобы отметить обработанные файлы из префикса источника ввода, вам нужно будет использовать boto3 (или awscli напрямую) для перемещения файлов или их удаления.

Чтобы определить, какие файлы обрабатывать,Вы можете продолжить двумя способами:

  • разрешить файл glob args['s3source'] + "/*.tsv.gz", используя boto3 с s3client.list_objects() перед использованием spark.Вы можете предоставить массив разрешенных файлов вместо глобуса для spark.read.load.
import boto3
client = boto3.client('s3')

# get all the available files
# Note: if you expect a lot of files, you need to iterate on the pages of results

response = client.list_objects_v2(Bucket=your_bucket_name,Prefix=your_path_prefix)
files=['s3://'+your_bucket_name+obj['Key'] for obj in response['Contents'] if obj.endswith('tsv.gz')]

 ... initialize your job as before ...

df0 = df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)

 ... do your work as before ...
  • использовать тот факт, что spark отслеживает все свои входные файлы для последующей обработки послеуспешное сохранение:
 ... process your files with pyspark as before...

# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'

files = [] 
for p in df0.rdd._jrdd.partitions(): 
    files.append([f.filePath() for f in p.files().array()])

После того, как у вас есть список файлов, удаление, переименование или добавление их в хранилище метаданных для фильтрации их на следующем задании довольно просто.

Например, чтобы удалить их:

# initialize a S3 client if not already done
from urlparse import urlparse # python 2
import boto3
client = boto3.client('s3')

# do what you want with the uris, for example delete them
for uri in files:
   parsed = urlparse(uri)
   client.delete_object(Bucket=parsed.netloc, Key=parsed.path)
1 голос
/ 21 марта 2019

Если вас не беспокоит повторная обработка одних и тех же исходных файлов (с учетом временных ограничений), и ваш вариант использования не предусматривает дублирование данных в месте назначения, вы можете рассмотреть вопрос об изменении режима сохранения на «Перезаписать», когда запись кадра данных

https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/DataFrameWriter.html

0 голосов
/ 27 апреля 2019

Окончательный рабочий код:

# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
import boto3
from urlparse import urlparse

# Read arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])

# Initialise boto3
client = boto3.client('s3')

# Get all the available files
response = client.list_objects_v2(Bucket = "xxx")
files = [ "s3://xxx/" + obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.tsv.gz') ]

# Initialise the glue job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define massive list of fields in the schema
fields = [
    StructField("accept_language", StringType(), True),
    StructField("browser", LongType(), True),
    .... huge list ...
    StructField("yearly_visitor", ShortType(), True),
    StructField("zip", StringType(), True)
]

schema = StructType(fields)

# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)

# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')

# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))

# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])

# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'
files = []
for p in df0.rdd._jrdd.partitions():
    files.extend([f.filePath() for f in p.files().array()])

# Move files to the processed folder
for uri in files:
   parsed = urlparse(uri)
   client.copy_object(CopySource = {'Bucket': parsed.netloc, 'Key': parsed.path.lstrip('/')}, Bucket = parsed.netloc, Key = 'processed' + parsed.path)
   client.delete_object(Bucket = parsed.netloc, Key = parsed.path.lstrip('/'))

job.commit()

0 голосов
/ 25 апреля 2019

Решение, которое я использовал для одного из наших процессов ETL, разработанных на основе клея AWS, заключалось в том, чтобы сначала перечислить и переместить файлы в s3 с помощью API boto3 в папку «РАБОТА».Этот процесс не должен занимать время, так как вы меняете только имя объекта s3, а не какое-либо физическое движение.

После выполнения вышеуказанного шага вы можете использовать папку «РАБОТА» в качестве входных данных для вашего SPARK dataFrame, в то время как новые файлы могут продолжать выталкиваться в другую папку s3.

Я не уверен насчетваш вариант использования, но мы использовали текущую системную дату и время, чтобы создать папку «РАБОТА», чтобы мы могли исследовать или повторно запускать любые файлы, если мы обнаружим какие-либо проблемы с процессом или данными, которые мы загрузили через несколько дней.

...