Я пытаюсь запустить приложение структурированной потоковой передачи, используя (py) spark. Мои данные считываются с топки Kafka c, а затем я запускаю оконную агрегацию во время события.
# I have been able to create data frame pn_data_df after reading data from Kafka
Schema of pn_data_df
|
- id StringType
- source StringType
- source_id StringType
- delivered_time TimeStamp
windowed_report_df = pn_data_df.filter(pn_data_df.source == 'campaign') \
.withWatermark("delivered_time", "24 hours") \
.groupBy('source_id', window('delivered_time', '15 minute')) \
.count()
windowed_report_df = windowed_report_df \
.withColumn('start_ts', unix_timestamp(windowed_report_df.window.start)) \
.withColumn('end_ts', unix_timestamp(windowed_report_df.window.end)) \
.selectExpr('CAST(source_id as LONG)', 'start_ts', 'end_ts', 'count')
Я записываю это оконное агрегирование в мою базу данных postgresql, которую я уже создал.
CREATE TABLE pn_delivery_report(
source_id bigint not null,
start_ts bigint not null,
end_ts bigint not null,
count integer not null,
unique(source_id, start_ts)
);
Запись в postgresql с использованием spark jdb c позволяет мне либо Append
, либо Overwrite
. Режим добавления завершается ошибкой, если в базе данных существует существующий составной ключ, а при перезаписи просто перезаписывается вся таблица с текущим пакетным выводом.
def write_pn_report_to_postgres(df, epoch_id):
df.write \
.mode('append') \
.format('jdbc') \
.option("url", "jdbc:postgresql://db_endpoint/db") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "pn_delivery_report") \
.option("user", "postgres") \
.option("password", "PASSWORD") \
.save()
windowed_report_df.writeStream \
.foreachBatch(write_pn_report_to_postgres) \
.option("checkpointLocation", '/home/hadoop/campaign_report_df_windowed_checkpoint') \
.outputMode('update') \
.start()
Как выполнить запрос наподобие
INSERT INTO pn_delivery_report (source_id, start_ts, end_ts, COUNT)
VALUES (1001, 125000000001, 125000050000, 128),
(1002, 125000000001, 125000050000, 127) ON conflict (source_id, start_ts) DO
UPDATE
SET COUNT = excluded.count;
в foreachBatch
.
В Spark есть открытый билет для функции jira, но, похоже, что это не было приоритетом до сих пор.
https://issues.apache.org/jira/browse/SPARK-19335