Я знаю, что невозможно просто обновить таблицу MySQL с помощью Spark, но я пытался что-то избежать, и это не работает.
Допустим, у меня есть таблица last_modification
, в которой я сохраняюимя пользователя в качестве идентификатора и даты последней модификации системы с различными сервисами.Каждый раз, когда я обрабатываю некоторые данные, я должен обновлять дату, когда данные этого пользователя были изменены, и, если новый пользователь входит в систему, я должен вставить их в таблицу.
Процесс:
Считайте данные из таблицы SQL:
df_last_mod = sqlContext.read.jdbc(url=url, table="last_modification", properties=properties)
Извлеките из этого DataFrame пользователей, которые будут обработаны (last_mod_actual
), и оставьте остальные вRDD (last_mod_aux
):
last_mod_actual = (df_last_mod
.rdd
.filter(lambda x: x[0] == service)
)
Обновить дату модификации из СДР обработанных пользователей (теперь она называется last_mod_rdd
) и присоединить ее к СДРпользователи, которые не были изменены:
union_rdd = last_mod_rdd.union(last_mod_aux)
Эта часть является дополнительной, чтобы не потерять данные, но не знает, можно ли их игнорировать.Здесь я создаю временную таблицу и кеширую ее:
header = (sqlContext
.createDataFrame(union_rdd,header_schema)
.createOrReplaceTempView("union_header")
)
sqlContext.cacheTable("union_header")
Наконец, я пишу таблицу с использованием JDBC:
dd = sqlContext.table("union_header")`
dd.write.format('jdbc').options(
url= url,
driver="com.mysql.jdbc.Driver",
dbtable="last_modification",
user=user,
password=password).mode('overwrite').save()
Этот кодкажется, работает несколько раз, но другие только сохраняют измененных пользователей и стирают неизмененных пользователей.При вставке dd.show()
перед записью в таблицу SQL программа, кажется, работает лучше, но на самом деле не знаю, почему, она работает немного случайным образом.