Как эффективно присоединиться к сотням кадров Spark? - PullRequest
2 голосов
/ 03 марта 2020

TL; DR;

«Какой самый оптимальный способ объединения тысяч фреймов данных Spark? Можем ли мы распараллелить это соединение? Так как оба не работают для меня. ”

Я пытаюсь объединить тысячи одностолбцовых фреймов данных (с col-символом PK для соединения) и затем сохранить полученный DF в Snowflake.

Операция объединения с циклом около 400 (5 м х 2) таких информационных кадров занимает более 3 часов для завершения на отдельном кластере Spark 32core / 350g, что, я думаю, не должно иметь значения из-за нажатия. В конце концов, разве не все, что Spark делает, это создает DAG для ленивых вычислений?

Вот моя конфигурация Spark:

spark = SparkSession \
    .builder \
    .appName("JoinTest")\
    .config("spark.master","spark://localhost:7077")\
    .config("spark.ui.port", 8050)\
    .config("spark.jars", "../drivers/spark-snowflake_2.11-2.5.2-spark_2.4.jar,../drivers/snowflake-jdbc-3.9.1.jar")\
    .config("spark.driver.memory", "100g")\
    .config("spark.driver.maxResultSize", 0)\
    .config("spark.executor.memory", "64g")\
    .config("spark.executor.instances", "6")\
    .config("spark.executor.cores","4") \
    .config("spark.cores.max", "32")\
    .getOrCreate()

И JOIN l oop:

def combine_spark_results(results, joinKey):
    # Extract first to get going
    # TODO: validations
    resultsDF = results[0]
    i = len(results)

    print("Joining Spark DFs..")
    for result in results[1:]:
        print(i, end=" ", flush=True)
        i -= 1
        resultsDF = resultsDF.join(result, joinKey, 'outer')

    return resultsDF

Я рассмотрел возможность распараллеливания соединений способом сортировки слиянием с использованием starmapasyn c (), однако проблема в том, что Spark DF не может быть возвращен из другого потока. Я также рассмотрел трансляцию основного кадра данных, из которого были созданы все объединяемые однокадровые данные,

spark.sparkContext.broadcast(data)

, но это выдает ту же ошибку, что и попытка вернуть объединенный DF из другого потока, а именно.

PicklingError: Не удалось сериализовать широковещательную рассылку: Py4JError: Произошла ошибка при вызове o3897. getstate . Trace: py4j.Py4JException: метод getstate ([]) не существует

Как я могу решить эту проблему?

Пожалуйста, не стесняйтесь спрашивать, если вы нужна дополнительная информация Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...