У меня есть итеративный алгоритм (pyspark), в котором я обновляю часть моего Spark DataFrame. Я делаю это через цикл для каждой итерации, и моя работа становится все дороже и имеет более длинную родословную. На итерации i у меня есть линия итерации i-1 +
несколько шагов (линия становится все более и более длинной).
Я перепробовал много вариантов, чтобы сломать родословную, но это не работает. Вот мой исходный код. Я работаю на jupyterLab VM.
def chronologically_compute(myDataFrame, number_of_compute, spark_session):
# UDFs
find_law_to_apply_udf = udf(find_law_to_apply, IntegerType())
compute_loss_udf = udf(compute_loss, FloatType())
TIMING = []
#myDataFrame = myDataFrame.repartition(1000)
spark_session.sparkContext.setCheckpointDir("myDirectory")
#myDataFrame.explain(True)
#myDataFrame.checkpoint()
for i in range(1, number_of_compute + 1):
debutRank = time.time()
print("Itération", i)
myDataFrame = myDataFrame.withColumn("column1",
when(myDataFrame.rank == i, find_law_to_apply_udf("updatedComputed")
).otherwise(myDataFrame.column1))
myDataFrame = myDataFrame.withColumn("SelectedValue",
when(myDataFrame.rank == i, myDataFrame["column2"].getItem(col("column1") - 1)
).otherwise(myDataFrame.SelectedValue))
myDataFrame = myDataFrame.withColumn("computed",
when(myDataFrame.rank == i, compute_loss_udf("SelectedValue", "Time")
).otherwise(myDataFrame.computed))
window = Window.partitionBy('ID')
myDataFrame = myDataFrame.withColumn('computedSum', sum("computed").over(window))
myDataFrame = myDataFrame.withColumn('updatedComputed',
when(myDataFrame.rank == i, myDataFrame.computedSum + myDataFrame.updatedComputed
).otherwise(myDataFrame.updatedComputed))
myDataFrame = myDataFrame.withColumn('updatedComputed',
when(myDataFrame.rank == i + 1, myDataFrame.computedSum + myDataFrame.updatedComputed
).otherwise(myDataFrame.updatedComputed))
if i % 10 == 0:
d = time.time()
myDataFrame.checkpoint()
print(myDataFrame.count())
#myDataFrame.persist(StorageLevel.DISK_ONLY_2)
duree_lineage = time.time() - d
print("Lineage took {0}".format(duree_lineage))
TIMING.append(duree_lineage)
duree = time.time() - debutRank
print("Modif took {0}".format(duree))
print("Iteration time sum", np.sum(TIMING))
print("Iteration time avg", np.mean(TIMING))
return myDataFrame
def main(spark_session):
try:
spark_jobs(spark_session)
except Exception as ex:
print(traceback.format_exc())
raise
if __name__ == "__main__":
SPARK_SESSION = SparkSession \
.builder \
.appName("AppName") \
.enableHiveSupport() \
.config('spark.executor.memory','2g') \
.config('spark.driver.memory','2g') \
.config('spark.driver.maxResultsSize','2g') \
.config("spark.logLineage", "true") \
.config("spark.executor.extraJavaOptions","-Xss32M") \
.getOrCreate()
main(SPARK_SESSION)
SPARK_SESSION.stop()