Все исполнители не используются при чтении JSON (zip .gz) в GCP из искрового кластера Google dataproc с использованием spark-submit - PullRequest
2 голосов
/ 13 июня 2019

Я только что познакомился с этим удивительным миром технологий больших данных и облачных вычислений, использующих GCP (dataproc) и pyspark. У меня есть ~ 5 ГБ размер JSON файл (в архиве, файл gz), содержащий ~ 5 миллионов записей, мне нужно прочитать каждую строку и обработать только те строки, которые удовлетворяет определенному условию. У меня есть рабочий код, и я выдал spark-submit с --num-partitions = 5, но все же только один рабочий используется для выполнения действия.

Я использую команду spark-submit:

spark-submit --num-executors 5 --py-files /home/user/code/dist/package-0.1-py3.6.egg job.py

job.py:

path = "gs://dataproc-bucket/json-files/data_5M.json.gz"
mi = spark.read.json(path)
inf_rel = mi.select(mi.client_id,
                    mi.user_id,
                    mi.first_date,
                    F.hour(mi.first_date).alias('hour'),
                    mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Конфигурация Dataproc: (Я сейчас использую бесплатную учетную запись, как только я получу работающее решение, добавим больше ядер и исполнителей)

(Debian 9, Hadoop 2.9, Spark 2.4) Главный узел: 2 виртуальных ЦП, 7,50 ГБ памяти Основной размер диска: 32 ГБ 5 рабочих узлов: 1 vCPU, 3,75 ГБ памяти Тип основного диска: 32 ГБ

После spark-submit я вижу в веб-интерфейсе, что было добавлено 5 исполнителей, но тогда остается активным только 1 исполнитель и выполняет все задачи, а остальные 4 освобождаются. enter image description here

Я провел свое исследование, и большинство вопросов говорят о доступе к данным через JDBC.

Пожалуйста, подскажите, чего мне здесь не хватает.

P.S. В конце концов я прочитал бы 64 файла json по 5 ГБ каждый, поэтому мог бы использовать 8 основных * 100 рабочих.

1 Ответ

1 голос
/ 14 июня 2019

Лучше всего предварительно обработать ввод. Для одного входного файла spark.read.json(... создаст одну задачу для чтения и анализа данных JSON, поскольку Spark не может заранее знать, как распараллелить их. Если ваши данные представлены в формате JSON с разделителями строк (http://jsonlines.org/),), лучше всего заранее разбить их на управляемые куски:

path = "gs://dataproc-bucket/json-files/data_5M.json"
# read monolithic JSON as text to avoid parsing, repartition and *then* parse JSON
mi = spark.read.json(spark.read.text(path).repartition(1000).rdd)
inf_rel = mi.select(mi.client_id,
                   mi.user_id,
                   mi.first_date,
                   F.hour(mi.first_date).alias('hour'),
                   mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Ваш начальный шаг здесь (spark.read.text(...) все еще будет узким местом как отдельное задание. Если ваши данные не разделены строкой или (особенно!) Вы ожидаете, что вам нужно будет работать с этими данными более одного раза, вы должны найти способ превратить ваш 5-гигабайтный JSON-файл в 1000 5-мегабайтный JSON-файл, прежде чем подключать Spark.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...