Pyspark - структурированный поток, сохранение агрегированного Df в hdfs - PullRequest
0 голосов
/ 25 мая 2019

На Dataproc запущено приложение потоковой передачи с плавающей точкой. Я применяю базовую группировку и агрегирование подсчетов к исходному фрейму данных, как показано ниже:

 df_windowed_counts = df_visits \
        .withWatermark("date_time", "10 minutes") \
        .groupBy(
            window(df_visits.date_time, "10 minutes", "5 minutes"),
            df_visits.date_time,
            df_visits.exact_time,
            df_visits.state,
            df_visits.type,
            df_visits.sub_cat) \
        .count() 

    query = df_windowed_counts.writeStream \
            .trigger(processingTime="15 seconds") \
            .outputMode("complete") \
            .format("console") \
            .start()

   query.awaitTermination()

вышеописанное работает нормально при печати на консоль, ноя хочу, чтобы этот dataframe (df_windowed_counts) был сохранен в моем gcs ведре.Поэтому я сделал следующее и столкнулся с ошибкой:

query = df_windowed_counts.writeStream \
            .format("parquet") \
            .option("checkpointLocation","gs://bucket_name/spark-agg-checkpoints") \
            .option("path","gs://bucket_name/aggregated_streaming_data") \
            .option("failOnDataLoss","false") \
            .start()

**

Добавление режима вывода не поддерживается при потоковых агрегатах потоковых DataFrames / DataSets без водяного знака

Я даже попробовал следующее после того, как получил вышеуказанную ошибку:

def foreach_batch_function(df,epoch_id) :

  df.writeStream \
    .format("parquet") \
    .option("checkpointLocation","gs://bucket_name/spark-agg-checkpoints") \
    .option("path","gs://bucket_name/aggregated_streaming_data") \
    .option("failOnDataLoss","false") \
    .start()


df_windowed_counts.writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()

Кажется, что ничего не работает.Пожалуйста, посоветуйте, что происходит не так, поскольку я не вижу достаточного количества документации по этому вопросу, только теоретические объяснения.

...