У меня есть потоковый фрейм данных, который я пытаюсь записать в базу данных.Есть документация для записи rdd или df в Postgres.Но я не могу найти примеры или документацию о том, как это делается в структурированной потоковой передаче.
Я прочитал документацию https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch, но не мог понять, где я могу создать соединение jdbc и как записать его в базу данных.
def foreach_batch_function(df, epoch_id):
# what goes in here?
pass
view_counts_query = windowed_view_counts.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function)
.option("truncate", "false") \
.trigger(processingTime="5 seconds") \
.start() \
.awaitTermination()
Эта функция принимает обычный фрейм данных и записывает в таблицу postgres
def postgres_sink(config, data_frame):
config.read('/src/config/config.ini')
dbname = config.get('dbauth', 'dbname')
dbuser = config.get('dbauth', 'user')
dbpass = config.get('dbauth', 'password')
dbhost = config.get('dbauth', 'host')
dbport = config.get('dbauth', 'port')
url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
properties = {
"driver": "org.postgresql.Driver",
"user": dbuser,
"password": dbpass
}
data_frame.write.jdbc(url=url, table="metrics", mode="append",
properties=properties)