Правильная структура будет
import uuid
def process_batch(rdd):
if not rdd.isEmpty():
result.saveAsTextFiles("/Users/rocket/Downloads/output-{}".format(
str(uuid.uuid4())
) ,"txt")
result.foreachRDD(process_batch)
Однако, как вы видите выше, для каждой партии требуется отдельный каталог, поскольку RDD API не имеет режима append
.
И альтернативой может быть:
def process_batch(rdd):
if not rdd.isEmpty():
lines = rdd.map(str)
spark.createDataFrame(lines, "string").save.mode("append").format("text").save("/Users/rocket/Downloads/output")