TL; DR;
«Какой самый оптимальный способ объединения тысяч фреймов данных Spark? Можем ли мы распараллелить это соединение? Так как оба не работают для меня. ”
Я пытаюсь объединить тысячи одностолбцовых фреймов данных (с col-символом PK для соединения) и затем сохранить полученный DF в Snowflake.
Операция объединения с циклом около 400 (5 м х 2) таких информационных кадров занимает более 3 часов для завершения на отдельном кластере Spark 32core / 350g, что, я думаю, не должно иметь значения из-за нажатия. В конце концов, разве не все, что Spark делает, это создает DAG для ленивых вычислений?
Вот моя конфигурация Spark:
spark = SparkSession \
.builder \
.appName("JoinTest")\
.config("spark.master","spark://localhost:7077")\
.config("spark.ui.port", 8050)\
.config("spark.jars", "../drivers/spark-snowflake_2.11-2.5.2-spark_2.4.jar,../drivers/snowflake-jdbc-3.9.1.jar")\
.config("spark.driver.memory", "100g")\
.config("spark.driver.maxResultSize", 0)\
.config("spark.executor.memory", "64g")\
.config("spark.executor.instances", "6")\
.config("spark.executor.cores","4") \
.config("spark.cores.max", "32")\
.getOrCreate()
И JOIN l oop:
def combine_spark_results(results, joinKey):
# Extract first to get going
# TODO: validations
resultsDF = results[0]
i = len(results)
print("Joining Spark DFs..")
for result in results[1:]:
print(i, end=" ", flush=True)
i -= 1
resultsDF = resultsDF.join(result, joinKey, 'outer')
return resultsDF
Я рассмотрел возможность распараллеливания соединений способом сортировки слиянием с использованием starmapasyn c (), однако проблема в том, что Spark DF не может быть возвращен из другого потока. Я также рассмотрел трансляцию основного кадра данных, из которого были созданы все объединяемые однокадровые данные,
spark.sparkContext.broadcast(data)
, но это выдает ту же ошибку, что и попытка вернуть объединенный DF из другого потока, а именно.
PicklingError: Не удалось сериализовать широковещательную рассылку: Py4JError: Произошла ошибка при вызове o3897. getstate . Trace: py4j.Py4JException: метод getstate ([]) не существует
Как я могу решить эту проблему?
Пожалуйста, не стесняйтесь спрашивать, если вы нужна дополнительная информация Заранее спасибо.