Фильтрация миллионов файлов с помощью pySpark и облачного хранилища - PullRequest
3 голосов
/ 06 ноября 2019

Передо мной стоит следующая задача: у меня есть отдельные файлы (например, Мб), хранящиеся в Google Cloud Storage Bucket, сгруппированные по каталогам по дате (каждый каталог содержит около 5 тыс. Файлов). Мне нужно посмотреть на каждый файл (xml), отфильтровать нужный файл и поместить его в Mongo или записать обратно в Google Cloud Storage, скажем, в формате паркета. Я написал простую программу pySpark, которая выглядит следующим образом:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = (
    SparkSession
    .builder
    .appName('myApp')
    .config("spark.mongodb.output.uri", "mongodb://<mongo_connection>") 
    .config("spark.mongodb.output.database", "test") 
    .config("spark.mongodb.output.collection", "test")
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true")
    .config("spark.dynamicAllocation.enabled", "true")
    .getOrCreate()
)

spark_context = spark.sparkContext
spark_context.setLogLevel("INFO")
sql_context   = pyspark.SQLContext(spark_context)

# configure Hadoop
hadoop_conf = spark_context._jsc.hadoopConfiguration()
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")


# DataFrame schema
schema = StructType([
    StructField('filename', StringType(), True),
    StructField("date", DateType(), True),
    StructField("xml", StringType(), True)
])

# -------------------------
# Main operation
# -------------------------
# get all files
files = spark_context.wholeTextFiles('gs://bucket/*/*.gz')

rows = files \
    .map(lambda x: custom_checking_map(x)) \
    .filter(lambda x: x is not None)

# transform to DataFrame 
df = sql_context.createDataFrame(rows, schema)

# write to mongo
df.write.format("mongo").mode("append").save()

# write back to Cloud Storage
df.write.parquet('gs://bucket/test.parquet')

spark_context.stop()

Я протестировал ее на подмножестве (один каталог gs://bucket/20191010/*.gz), и она работает. Я развернул его в кластере Google Dataproc, но сомневаюсь, что что-то происходит, когда журналы останавливаются после 19/11/06 15:41:40 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1573054807908_0001

Я использую 3 рабочих кластера с 4 ядрами и 15 ГБ ОЗУ + 500 ГБ HDD. Spark версия 2.3.3, scala 2.11 mongo-connector-spark_2.11-2.3.3. Я новичок в Spark, поэтому любые предложения приветствуются. Обычно я писал бы эту работу с использованием многопроцессорной обработки Python, но хотел перейти к чему-то «лучше», но сейчас я не уверен.

1 Ответ

2 голосов
/ 07 ноября 2019

Перечисление очень большого количества файлов в GCS может занять значительное время - скорее всего, ваша работа «зависает», пока драйвер Spark выводит список всех файлов перед началом обработки.

Вы достигнете гораздо большей производительности, еслисначала перечисляя все каталоги, а затем обрабатывая файлы в каждом каталоге - для достижения максимальной производительности вы можете обрабатывать каталоги параллельно, но с учетом того, что в каждом каталоге имеется 5 тыс. файлов, а в вашем кластере всего 3 рабочих, этого может быть достаточно для последовательной обработки каталогов,

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...