Уменьшите количество зависимостей в фрейме данных pyspark - PullRequest
2 голосов
/ 28 апреля 2020

Я запускаю потоковую работу pyspark. Для каждого rdd я обновляю временную таблицу некоторыми новыми данными, которые я хочу кешировать, как показано ниже:

def forach_rdd(rdd):
    sqlContext = SQLContext(rdd.context)
    cached_data_df = sqlContext.sql("SELECT * FROM temp_table WHERE UPDATED_ON >= NOW() - INTERVAL 24 HOUR")

    external_df = sqlContext.read.format("jdbc").options(
        url=config.value.get('host'),
        driver="com.mysql.jdbc.Driver",
        user=config.value.get('username'),
        password=config.value.get('password'),
        fetchsize=25000,
        query="SELECT * FROM temp_table WHERE /*SOME THRESHOLD FOR NEW VALUES*/"
    ).load()

    union_df = cached_data_df.union(external_df).coalesce(3).cache()
    union_df.createOrReplaceTempView('temp_table')

    # operate on union_df 

DStream.foreachRDD(forach_rdd)

Через несколько часов задание spark падает из-за переполнения стека;) Причина в том, что Скорее всего, это связано с растущим деревом зависимостей rdd под фреймом данных.

Мой вопрос: как заставить форсировать создание нового фрейма данных с обновленными данными, но без истории зависимостей.

Полагаю, что-то как показано ниже, будет работать, но это не кажется очень эффективным:

sc.parallelize(union_df.collect()).toDF(union_df.schema)

Есть ли лучший способ сделать это? Я бы приветствовал любые подсказки.

[править] Я загрузил трассировку стека исключений в pastebin, так как он немного длинный: https://pastebin.com/raw/3sPNdyUa

1 Ответ

1 голос
/ 28 апреля 2020

Начните с замены кэширования активной контрольной точкой:

union_df = cached_data_df.union(external_df).coalesce(3).checkpoint(True)

Это временно облегчит вашу проблему, но вы должны установить более надежные контрольные точки для потоковой передачи. Взгляните на документы .

...