Spark работа с использованием хранилища HDFS - PullRequest
4 голосов
/ 01 октября 2019

У меня есть долгосрочное задание на структурированную потоковую передачу Spark, запущенное в Google Cloud Dataproc, в котором Kafka используется как источник и приемник. Я также сохраняю свои контрольные точки в облачном хранилище Google.

После работы в течение недели я заметил, что она постоянно использует все дисковое хранилище объемом 100 ГБ, сохраняя файлы в /hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/....

* 1005. * Насколько я понимаю, моя работа Spark не должна зависеть от локального дискового хранилища.

Я совершенно не понимаю этого здесь?

Я отправил свою работу так:

(cd  app/src/packages/ &&  zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
    --cluster cluster-26aa \
    --region us-central1 \
    --properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
    --py-files build/mypkg.zip \
    --max-failures-per-hour 10 \
    --verbosity info \
    app/src/explode_rmq.py

Это важные части моей работы:

Источник:

 spark = SparkSession \
        .builder \
        .appName("MyApp") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.addPyFile('mypkg.zip')

    df = spark \
        .readStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("subscribe", "lsport-rmq-12") \
        .option("startingOffsets", "earliest") \
        .load() \
        .select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))

Раковина:

    sink_kafka_q = sink_df \
        .writeStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("topic", "my_topic") \
        .option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
        .start()

Ответы [ 2 ]

1 голос
/ 01 октября 2019

Если памяти недостаточно, Spark сохранит информацию на локальном диске. Вы можете отключить постоянство на диске следующим образом:

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

Или вы можете попытаться сериализовать информацию, чтобы занять меньше памяти, как это

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

Чтение сериализованных данных будет больше загружать процессоринтенсивный.

Каждый кадр данных имеет свой отдельный уровень сериализации.

Для получения дополнительной информации: https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

0 голосов
/ 09 октября 2019

Можете ли вы подключиться по SSH к главному узлу и выполнить следующую команду, чтобы выяснить, кто использует пространство HDFS?

hdfs df -du -h /

Я тестировал с помощью простого задания Spark Pi,

перед темвыполнение задания:

$ hdfs dfs -du /
34       /hadoop
0        /tmp
2107947  /user

после завершения задания:

$ hdfs dfs -du /user/
0        /user/hbase
0        /user/hdfs
0        /user/hive
0        /user/mapred
0        /user/pig
0        /user/root
2107947  /user/spark
0        /user/yarn
0        /user/zookeeper

$ hdfs dfs -du /user/spark/
2107947  /user/spark/eventlog

Кажется, что оно используется журналом событий Spark, см. spark.eventLog.dir . Вы можете рассмотреть сжатие журнала событий с помощью spark.eventLog.compress=true или отключить его с помощью spark.eventLog.enabled=false

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...