Как установить setCheckpoint в pyspark - PullRequest
0 голосов
/ 17 февраля 2019

Я не знаю много искры.В верхней части кода у меня

from  pysaprk.sql import SparkSession
import pyspark.sql.function as f
spark = SparkSession.bulder.appName(‘abc’).getOrCreate()
H = sqlContext.read.parquet(‘path to hdfs file’)

H около 30 миллионов записей и будет использоваться в цикле.Поэтому я написал

H.persist().count()

У меня есть список из 50 строк L = [s1,s2,…,s50], каждая из которых используется для построения небольшого фрейма данных из H, которые должны располагаться друг над другом.Я создал пустой фрейм данных Z

schema = StructType([define the schema here])
Z = spark.createDataFrame([],schema)

Затем следует цикл

for st in L:
    K = process H using st
    Z = Z.union(H)

, где K содержит не более 20 строк.Когда L имеет только 2 или 3 элемента, этот код работает.Но для длины L = 50 это никогда не заканчивается.Сегодня я узнал, что я могу использовать контрольные точки.Поэтому я создал путь hadoop и прямо над тем местом, где начинается цикл, я написал:

SparkContext.setCheckpointDir(dirName=‘path/to/checkpoint/dir’)

Но я получаю следующую ошибку: missing 1 required positional argument: ‘self’.Мне нужно знать, как исправить ошибку и как изменить цикл, чтобы включить контрольную точку.

1 Ответ

0 голосов
/ 17 февраля 2019

Создайте объект для SparkContext, и тогда вам не нужно указывать параметр self.Также удалите имя параметра, который не нужен.

Работает такой код, как показано ниже:

from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf())
sc.setCheckpointDir(‘path/to/checkpoint/dir’)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...