Где поставить awaitTermination () при использовании foreachBatch - потоковая передача pyspark - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть следующий код, он выполняется без каких-либо ошибок, однако он не заполняет таблицу meter_reading в моей базе данных, я подозреваю, что проблема может заключаться в том, что я где-то пропускаю .awaitTermination (). Когда я пытаюсь добавить это в строку в конце, как показано ниже:

query = df.writeStream.foreachBatch (process_row) .start () \ .awaitTermination ()

Код бомбы со стеком ошибок, утверждающим, что foreachBatch принимает только один параметр.

Мой вопрос: где должен awaitTermination () go?

import sys
from pyspark.sql import types, SparkSession, DataFrame, Row
from pyspark.sql.types import *

user= "username"
password  = "****"
database  =  "MeterReportDatabase"
sourceDir = "/user/london_smart_meter/halfhourly_dataset"
datasource_name = "SqlDataPool"

sparkSession = SparkSession.builder.master("local")\
                                   .appName("SparkStreamingAppendMode")\
                                   .getOrCreate()

schema = StructType([StructField('LCLid' , StringType(), True),
                     StructField('tstp'  , TimestampType(), True),
                     StructField('energy', DoubleType(), True)])

hostname = "master-0.master-svc"
port = 1433
db_target_url = "jdbc:sqlserver://%s:%d;database=%s;user=%s;password=%s;" %(hostname, port, database, user, password)
db_target_url

df = sparkSession.readStream\
                 .option("header", "true")\
                 .schema(schema)\
                 .csv("/user/london_smart_meter/halfhourly_dataset")

def process_row(row):
    row.write.jdbc(url=db_target_url, table="meter_reading", mode="append")
    pass
query = df.writeStream.foreachBatch(process_row).start()
...