[Структурированная потоковая передача]: запись потоковых данных в Postgres. - PullRequest
0 голосов
/ 19 февраля 2019

У меня есть потоковый фрейм данных, который я пытаюсь записать в базу данных.Есть документация для записи rdd или df в Postgres.Но я не могу найти примеры или документацию о том, как это делается в структурированной потоковой передаче.

Я прочитал документацию https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch, но не мог понять, где я могу создать соединение jdbc и как записать его в базу данных.

def foreach_batch_function(df, epoch_id):
    # what goes in here?
    pass

view_counts_query = windowed_view_counts.writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function)
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .start() \
    .awaitTermination()

Эта функция принимает обычный фрейм данных и записывает в таблицу postgres

def postgres_sink(config, data_frame):
    config.read('/src/config/config.ini')
    dbname = config.get('dbauth', 'dbname')
    dbuser = config.get('dbauth', 'user')
    dbpass = config.get('dbauth', 'password')
    dbhost = config.get('dbauth', 'host')
    dbport = config.get('dbauth', 'port')

    url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass
    }

    data_frame.write.jdbc(url=url, table="metrics", mode="append",
                          properties=properties)

1 Ответ

0 голосов
/ 19 февраля 2019

Здесь действительно мало что можно сделать, помимо того, что у вас уже есть.foreachBatch принимает функцию (DataFrame, Int) => None, поэтому все, что вам нужно, это небольшой адаптер, а все остальное должно работать просто отлично:

def foreach_batch_for_config(config)
    def _(df, epoch_id):
        postgres_sink(config, df)
   return _

view_counts_query = (windowed_view_counts
    .writeStream
    .outputMode("append") 
    .foreachBatch(foreach_batch_for_config(some_config))
    ...,
    .start()
    .awaitTermination())

хотя, честно говоря, передача ConfigParserвокруг странная идея с самого начала.Вы можете настроить подпись и инициализировать ее на месте

def postgres_sink(data_frame, batch_id):
    config = configparser.ConfigParser()
    ...
    data_frame.write.jdbc(...)

и оставить все как есть.Таким образом, вы можете использовать вашу функцию напрямую:

...
.foreachBatch(postgres_sink)
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...