Я работаю над вариантом использования, когда мне нужно обработать огромное количество данных (несколько таблиц), и я пытаюсь отправить это как пакетное задание в кластер Datapro c (PySpark).
Мой код выглядит примерно так:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
def readconfig():
#code to read a yaml file
def func(filename, tabname):
sc = SparkContext("local", "First App")
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()
df1= read from file-filename as rdd using sqlcontext
df2= read from bigquery-tabname as df using spark
.
op = intermediate processing
.
#caching and unpersisting 2 dfs
.
op.write.csv(write multiple files in gcs bucket)
sc.stop()
spark.stop()
print("one pair of table and file processed")
if __name__ == "__main__":
config= readconfig()
for i,j in config.items():
func(i,j):
Поскольку размеры файлов огромны, я пытаюсь создать отдельный SparkSession
для каждой пары обрабатываемых файлов и таблиц. Он отлично работает, и мне удалось обработать большое количество таблиц. Позже я начал получать предупреждение о проблемах с памятью на узле и, наконец, сообщение об ошибке:
node недостаточно ресурсов. Не удалось создать SparkSession.
Почему это происходит, когда закрытие SparkSession
должно освободить память от данных с предыдущей итерации?