У меня есть следующий код, он выполняется без каких-либо ошибок, однако он не заполняет таблицу meter_reading в моей базе данных, я подозреваю, что проблема может заключаться в том, что я где-то пропускаю .awaitTermination (). Когда я пытаюсь добавить это в строку в конце, как показано ниже:
query = df.writeStream.foreachBatch (process_row) .start () \ .awaitTermination ()
Код бомбы со стеком ошибок, утверждающим, что foreachBatch принимает только один параметр.
Мой вопрос: где должен awaitTermination () go?
import sys
from pyspark.sql import types, SparkSession, DataFrame, Row
from pyspark.sql.types import *
user= "username"
password = "****"
database = "MeterReportDatabase"
sourceDir = "/user/london_smart_meter/halfhourly_dataset"
datasource_name = "SqlDataPool"
sparkSession = SparkSession.builder.master("local")\
.appName("SparkStreamingAppendMode")\
.getOrCreate()
schema = StructType([StructField('LCLid' , StringType(), True),
StructField('tstp' , TimestampType(), True),
StructField('energy', DoubleType(), True)])
hostname = "master-0.master-svc"
port = 1433
db_target_url = "jdbc:sqlserver://%s:%d;database=%s;user=%s;password=%s;" %(hostname, port, database, user, password)
db_target_url
df = sparkSession.readStream\
.option("header", "true")\
.schema(schema)\
.csv("/user/london_smart_meter/halfhourly_dataset")
def process_row(row):
row.write.jdbc(url=db_target_url, table="meter_reading", mode="append")
pass
query = df.writeStream.foreachBatch(process_row).start()