Я новичок в мире искр :-).Попытка написать рабочий процесс слияния объединяет маленькие журналы в большой согласно некоторым правилам.Одним из шагов я хочу преобразовать RDD в объект DataFrame и использовать API, так как проще управлять столбцами.Код ниже:
def main(path):
print("Read from S3:{}".format(path))
with SparkContext() as sc:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
spark.conf.set("spark.driver.extrajavaoptions", "-XX:+UseParallelGC")
spark.conf.set("spark.executor.extrajavaoptions", "-XX:+UseParallelGC")
#spark.conf.set("spark.driver.maxResultSize", "3g")
spark.conf.set("spark.rdd.compress", "true")
rdd = sc.newAPIHadoopFile(
path,
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
keyConverter=None
)
rdd1 = rdd.map(lambda key_val: process_line(key_val[1])).reduceByKey(lambda x, y: x + y)
print("RDD=>{}".format(rdd1.getNumPartitions()))
df = rdd1.toDF(["ns_pod", "log"])
df.write.partitionBy("ns_pod").option("quote", "\u0000").text('hdfs:///merged')
df.explain()
Это хорошо работает в небольшом наборе данных, но когда я тестирую в большом наборе данных, я получаю исключение:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 11 tasks (1268.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Я провел некоторые исследования в Интернете, кажется, что я могупросто увеличьте «spark.driver.maxResultSize», как я показываю в комментариях, но он не масштабируется.Причиной возникновения этой ошибки является мастер-узел, пытающийся собрать данные с рабочих узлов.Интересно, есть ли способ конвертировать RDD в DF без триггера сбора данных?Не уверен, что я делаю что-то не так? :-) А если нет способа, как выбрать значение maxResultSize?(Пример: 80% памяти драйверов?) Спасибо.