Я использую Spark 2.4.2 на GCP DataProc и применяю агрегирование с сохранением состояния для потоковой передачи данных IOT с водяным знаком 1 дня, как показано ниже:
csvDF = sqlcontext \
.readStream \
.option("sep", ",") \
.option("checkpointLocation", "gs://bucket_name/checkpoint") \
.schema(schema) \
.csv(bucket_path)
df_aggregated = csvDF \
.withWatermark("date_time","1 day") \
.groupBy(
csvDF.unique_device_id) \
.agg(
sum(col('overall_measure1')),
sum(col('overall_measure2')),
sum(col('overall_measure3'))
)
def process_row(row):
if row['overall_measure1'] >= 10000 :
#Write a file with a custom message to the gcs bucket
print(row)
pass
query = (
df_aggregated.writeStream \
.foreach(process_row)
.outputMode("complete")
.start()
)
Моя цель - получить суммированное значение для каждой строки и проверить, выходят ли какие-либо из значений (total_measure1, total_measure2, total_measure3) за определенное значение, скажем, 10000 .if, поэтому я хочу написать несколько пользовательских сообщение в мое ведро GCS. Я пробовал ниже, там не хватает документации, поэтому я был бы признателен, если бы кто-нибудь мог дать мне знать, как это сделать.
Возникла проблема - Я не могу получить значение с помощью приемника foreach, и мой вопрос не в том, как записать пользовательский файл в корзину gcs.