PySpark + AWS EMR: df.count () занимает много времени для завершения - PullRequest
0 голосов
/ 13 июля 2020

Я использую действие count() для запуска моей функции udf. Это работает, но спустя много времени после того, как моя функция udf завершит работу, df.count () займет несколько дней. Сам фрейм данных невелик и содержит от 30 000 до 100 000 строк.

AWS Настройки кластера:

  • 1 m5.4xlarge для главного узла
  • 2 m5.4xlarge для рабочих узлов.

Переменные и настройки Spark (Это переменные искры, используемые для запуска скрипта)

  • - ядра-исполнители 4

  • - conf spark. sql .execution.arrow.enabled = true

  • 'spark. sql .inMemoryColumnarStorage.batchSize', 2000000 (устанавливается внутри скрипта pyspark)

Psuedo Code

Вот фактическая структура нашего скрипта. Специальная функция pandas udf вызывает базу данных PostGres для каждой строки.

from pyspark.sql.functions import pandas_udf, PandasUDFType

# udf_schema: A function that returns the schema for the dataframe

def main():
    # Define pandas udf for calculation
    # To perform this calculation, every row in the 
    # dataframe needs information pulled from our PostGres DB
    # which does take some time, ~2-3 hours
    @pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
    def calculate_values(local_df):
        local_df = run_calculation(local_df)
        return local_df

    # custom function that pulls data from our database and
    # creates the dataframe
    df = get_df()

    df = df\
        .groupBy('some_unique_id')\
        .apply(calculate_values)

    print(f'==> finished running calculation for {df.count()} rows!')

    return
...