PySpark Streaming «PicklingError: Не удалось сериализовать объект» при проверке и преобразовании - PullRequest
0 голосов
/ 23 мая 2019

В PySpark Steaming, если включена контрольная точка и есть операция transform-join, выдается ошибка.

sc=SparkContext(appName='xxxx')
sc.setLogLevel("WARN")
ssc=StreamingContext(sc,10)
ssc.checkpoint("hdfs://xxxx/test")

kafka_bootstrap_servers="xxxx"
topics = ['xxxx', 'xxxx']

doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))
kvds=KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": kafka_bootstrap_servers})

line=kvds.map(lambda x:(1,2))

line.transform(lambda rdd:rdd.join(doc_info)).pprint(10)

ssc.start()
ssc.awaitTermination()

Сведения об ошибке:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Подобный код прекрасно работает в Scala.И если мы удалим любую из

ssc.checkpoint("hdfs://xxxx/test")

или

line.transform(lambda rdd:rdd.join(doc_info))

Также не будет ошибки.

...