Я не знаю много искры.В верхней части кода у меня
from pysaprk.sql import SparkSession
import pyspark.sql.function as f
spark = SparkSession.bulder.appName(‘abc’).getOrCreate()
H = sqlContext.read.parquet(‘path to hdfs file’)
H около 30 миллионов записей и будет использоваться в цикле.Поэтому я написал
H.persist().count()
У меня есть список из 50 строк L = [s1,s2,…,s50]
, каждая из которых используется для построения небольшого фрейма данных из H, которые должны располагаться друг над другом.Я создал пустой фрейм данных Z
schema = StructType([define the schema here])
Z = spark.createDataFrame([],schema)
Затем следует цикл
for st in L:
K = process H using st
Z = Z.union(H)
, где K содержит не более 20 строк.Когда L имеет только 2 или 3 элемента, этот код работает.Но для длины L = 50 это никогда не заканчивается.Сегодня я узнал, что я могу использовать контрольные точки.Поэтому я создал путь hadoop и прямо над тем местом, где начинается цикл, я написал:
SparkContext.setCheckpointDir(dirName=‘path/to/checkpoint/dir’)
Но я получаю следующую ошибку: missing 1 required positional argument: ‘self’
.Мне нужно знать, как исправить ошибку и как изменить цикл, чтобы включить контрольную точку.