Я запускаю потоковую работу pyspark. Для каждого rdd я обновляю временную таблицу некоторыми новыми данными, которые я хочу кешировать, как показано ниже:
def forach_rdd(rdd):
sqlContext = SQLContext(rdd.context)
cached_data_df = sqlContext.sql("SELECT * FROM temp_table WHERE UPDATED_ON >= NOW() - INTERVAL 24 HOUR")
external_df = sqlContext.read.format("jdbc").options(
url=config.value.get('host'),
driver="com.mysql.jdbc.Driver",
user=config.value.get('username'),
password=config.value.get('password'),
fetchsize=25000,
query="SELECT * FROM temp_table WHERE /*SOME THRESHOLD FOR NEW VALUES*/"
).load()
union_df = cached_data_df.union(external_df).coalesce(3).cache()
union_df.createOrReplaceTempView('temp_table')
# operate on union_df
DStream.foreachRDD(forach_rdd)
Через несколько часов задание spark падает из-за переполнения стека;) Причина в том, что Скорее всего, это связано с растущим деревом зависимостей rdd под фреймом данных.
Мой вопрос: как заставить форсировать создание нового фрейма данных с обновленными данными, но без истории зависимостей.
Полагаю, что-то как показано ниже, будет работать, но это не кажется очень эффективным:
sc.parallelize(union_df.collect()).toDF(union_df.schema)
Есть ли лучший способ сделать это? Я бы приветствовал любые подсказки.
[править] Я загрузил трассировку стека исключений в pastebin, так как он немного длинный: https://pastebin.com/raw/3sPNdyUa