pyspark Структурированный поток (2.4.2) - PullRequest
0 голосов
/ 08 июня 2019

Я использую Spark 2.4.2 на GCP DataProc и применяю агрегирование с сохранением состояния для потоковой передачи данных IOT с водяным знаком 1 дня, как показано ниже:

csvDF = sqlcontext \
    .readStream \
    .option("sep", ",") \
    .option("checkpointLocation", "gs://bucket_name/checkpoint") \
    .schema(schema) \
    .csv(bucket_path)

df_aggregated = csvDF \
                .withWatermark("date_time","1 day") \
                .groupBy(
                    csvDF.unique_device_id) \
                .agg(
                    sum(col('overall_measure1')),
                    sum(col('overall_measure2')),
                    sum(col('overall_measure3'))
                   )

def process_row(row):
          if row['overall_measure1'] >= 10000 : 
               #Write a file with a custom message to the gcs bucket
          print(row)
          pass

query = (

        df_aggregated.writeStream \
        .foreach(process_row)
        .outputMode("complete")
        .start()
    )

Моя цель - получить суммированное значение для каждой строки и проверить, выходят ли какие-либо из значений (total_measure1, total_measure2, total_measure3) за определенное значение, скажем, 10000 .if, поэтому я хочу написать несколько пользовательских сообщение в мое ведро GCS. Я пробовал ниже, там не хватает документации, поэтому я был бы признателен, если бы кто-нибудь мог дать мне знать, как это сделать.

Возникла проблема - Я не могу получить значение с помощью приемника foreach, и мой вопрос не в том, как записать пользовательский файл в корзину gcs.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...