На Dataproc запущено приложение потоковой передачи с плавающей точкой. Я применяю базовую группировку и агрегирование подсчетов к исходному фрейму данных, как показано ниже:
df_windowed_counts = df_visits \
.withWatermark("date_time", "10 minutes") \
.groupBy(
window(df_visits.date_time, "10 minutes", "5 minutes"),
df_visits.date_time,
df_visits.exact_time,
df_visits.state,
df_visits.type,
df_visits.sub_cat) \
.count()
query = df_windowed_counts.writeStream \
.trigger(processingTime="15 seconds") \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
вышеописанное работает нормально при печати на консоль, ноя хочу, чтобы этот dataframe (df_windowed_counts) был сохранен в моем gcs ведре.Поэтому я сделал следующее и столкнулся с ошибкой:
query = df_windowed_counts.writeStream \
.format("parquet") \
.option("checkpointLocation","gs://bucket_name/spark-agg-checkpoints") \
.option("path","gs://bucket_name/aggregated_streaming_data") \
.option("failOnDataLoss","false") \
.start()
**
Добавление режима вывода не поддерживается при потоковых агрегатах потоковых DataFrames / DataSets без водяного знака
Я даже попробовал следующее после того, как получил вышеуказанную ошибку:
def foreach_batch_function(df,epoch_id) :
df.writeStream \
.format("parquet") \
.option("checkpointLocation","gs://bucket_name/spark-agg-checkpoints") \
.option("path","gs://bucket_name/aggregated_streaming_data") \
.option("failOnDataLoss","false") \
.start()
df_windowed_counts.writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()
Кажется, что ничего не работает.Пожалуйста, посоветуйте, что происходит не так, поскольку я не вижу достаточного количества документации по этому вопросу, только теоретические объяснения.