Как повторно разделить данные с помощью Spark без временного хранилища? - PullRequest
0 голосов
/ 06 августа 2020

Я запускаю Spark в кластере Kubernetes. При повторном разбиении данных на множество разделов, заставляя их иметь только 1 файл на раздел, я получаю выселение своих модулей.

Ошибка выглядит следующим образом:

The node was low on resource: ephemeral-storage. Container sosreport-spark-cluster-opendatahub-w was using 56291400Ki, which exceeds its request of 0.

Мои конфигурации Spark являются:

def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
    print('Spark cluster is: {}'.format(spark_cluster))
    sc_conf = (
        pyspark.SparkConf().setMaster(spark_cluster) \
        .set('spark.driver.host', HOSTNAME) \
        .set('spark.driver.port', 42000) \
        .set('spark.driver.bindAddress', '0.0.0.0') \
        .set('spark.driver.blockManager.port', 42100) \
        .set('spark.executor.memory', '1536M') \
        .set('spark.executor.cores', '2') \
        .set('spark.sql.parquet.enableVectorizedReader', True) \
        .set('spark.kubernetes.memoryOverheadFactor', '0.20')
    )
    return sc_conf

Вот как я повторно разбиваю данные:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

1 Ответ

1 голос
/ 07 августа 2020

Вероятно, ваша проблема заключается не в том, что вы используете временное хранилище, а в том, что вы отправляете все свои данные одному работнику.

". Repartition (1," created_year ", "created_month", "created_day") "

Вы объединяете все свои данные в один раздел Spark, который затем записывается во все разделы.

Что вы, вероятно, захотите сделать, так это сделать глобальный отсортируйте по ключу раздела, а затем выполните write.partionBy. Это отправляет большую часть данных в один раздел таблицы в ограниченное количество искровых разделов. (обычно это один искровый раздел, если ваши разделы маленькие)

это обычно выглядит как ...

    df.orderBy("partitionCol")
      .write
      .partitionBy("partitionCol")
      .insertInto("my_table")
...