Ошибки памяти с итеративным созданием Spark DataFrame - PullRequest
2 голосов
/ 06 апреля 2019

Я конвертирую необработанные записи, которые поступают мне в виде сжатых файлов zlib, в обогащенные записи паркета для последующей обработки в Spark. Я не контролирую файл zlib, и мне нужен паркет, совместимый с другой обработкой. Я работаю в Pyspark и Spark 2.3. Мой подход работает, за исключением случаев, когда файл zlib достаточно большой (~ 300 МБ). Он хорошо хранится в памяти, но у Spark не хватает памяти. Если я запихну память водителя (8g), это сработает. Это похоже на утечку памяти при использовании вызовов функций, как показано ниже.

Процесс обогащения несколько разбирает данные в Spark, поэтому я использую итеративное создание фрейма данных и память драйвера приличного размера (по умолчанию 4g). Я загружаю весь исходный распакованный файл в память, а затем передаю его в простую подпрограмму, чтобы создать фрейм данных искры, добавить столбцы в процессе обогащения и затем записать в паркет. Куски установлены в размере. Само обогащение может варьироваться, но в целом размер куска не должен создавать слишком большой размер кадра данных в драйвере, я считаю. Размер выходных блоков в формате паркета составляет около 100 МБ.

def process_file(gzfile, spark, chunk_size=2000000):
    # load_data_and decompress
    data = load_original_data(gzfile)
    if len(data) == 0:
        raise ValueError("No records loaded from file ", gzfile)
    chunks = len(data) // chunk_size + 1
    offset = 0
    for chunk in range(chunks):
        # convert the chunk into a spark dataframe 
        df = raw_to_spark(data[offset:offset+chunk_size], spark)
        offset += chunk_size
        # enrich the data while in a spark dataframe w/ more columns
        df = extract_fields_from_raw(df)
        save_to_parquet(df, parquet_output_path)
    return 


def raw_to_spark(events: List[str], spark: pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    """
    convert the list of raw strings into a spark dataframe so we can do all the processing. this list is large in
    memory so we pop one list while building the new one. then throw the new list into spark. 
"""
    schema = StructType([StructField("event", StringType())])
    rows = []  # make the list smaller as we create the row list for the dataframe
    while events:
        event = events.pop()
        if event.count(",") >= 6:  # make sure there are 7 fields at least
            rows.append(pyspark.sql.Row(event))
    rdd = spark.sparkContext.parallelize(rows, numSlices=2000)  # we need to partition in order to pass to workers
    return spark.createDataFrame(rdd, schema=schema)

def extract_fields_from_raw(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
"""
this adds columns to the dataframe for enrichment before saving
"""

Итак, кадры данных Spark размером <2M создаются, когда я перебираю распакованные данные. У каждого из этих фреймов данных не должно возникнуть проблем, если они находятся в пространстве драйверов 4g. Я получаю ту же ошибку, например, если бы я использовал чанк 1M. Журнал Spark показывает, что ошибка нехватки памяти будет иметь такое же потребление памяти до сбоя, например, 4,5 ГБ используемой памяти. </p>

Я подозреваю, что происходит то, что память не освобождается после каждого вызова raw_to_spark, но я не уверен, как это показать ... и я не вижу логически, как протолкнуть преобразование данных в Spark. функция иначе.

Я пропускаю лучшую практику? спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...