Sparksql получает исключение тайм-аута трансляции - PullRequest
0 голосов
/ 15 октября 2019

У меня следующий код pyspark для агрегирования подсчета чего-либо

df_total_asin_count_stat = df_product_full_asin.agg(count(col("asin")).alias("totalAsinCount")).withColumn("batchId", lit(batch_name)).cache()
df_delta_asin_count_stat = df_product_delta_asin.agg(count(col("asin")).alias("deltaAsinCount")).withColumn("batchId", lit(batch_name)).cache()

Затем я сделал внутреннее соединение, сбросил в базу данных

df_stat = df_total_asin_count_stat.join(df_delta_asin_count_stat, "batchId", "inner").withColumn("rankType", lit(rank_type))
df_stat.write.jdbc(url=jdbc_url, table='"stats"', mode="append", properties=jdbc_properties)

Мой вопрос: почему я получаю "Не удалось выполнить широковещательную рассылку за 300 секунд », если я не использовал cache () за двумя функциями agg? Но это хорошо работает, если я просто добавляю «cache ()» за двумя кадрами данных. Требуемое объединение таблиц на самом деле имеет только одну строку, трансляция не должна занимать более 300 секунд.

Я знаю, что здесь означает cache (). Что меня смущает, так это то, что cache () не будет иметь никакого эффекта, так как у меня есть только одно «действие» (write.jdbc), почему spark дает другой результат, просто добавив сюда cache ()?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...