Я пытаюсь выполнить операцию слияния (upsert) на MySql, используя Pyspark DataFrames и соединение JDB C. Следуйте приведенной ниже статье, чтобы сделать то же самое, что в scala. (https://medium.com/@thomaspt748 / как вставить данные в реляционную базу данных с использованием - apache -spark-part-2-45a9d49d0f43 ).
Но мне нужно выполнить операции upsert, используя pyspark, который застрял при переборе Pyspark Dataframe для вызова функции upsert, как показано ниже. Необходимо передать исходный фрейм данных в качестве входных данных для чтения в качестве параметров и выполнить sql upsert. (Проще говоря, выполнение sql upsert с использованием фрейма данных pyspark)
def upsertToDelta(id, name, price, purchase_date):
try:
connection = mysql.connector.connect(host='localhost',
database='Electronics',
user='pynative',
password='pynative@#29')
cursor = connection.cursor()
mySql_insert_query = "MERGE INTO targetTable USING
VALUES (%s, %s, %s, %s) as INSROW((Id, Name, Price, Purchase_date)
WHEN NOT MATCHED THEN INSERT VALUES (INSROW.Id,INSROW.Price,INSROW.Purchase,INSROW.Purchase_date)
WHEN MATCHED THEN UPDATE SET set Name=INSROW.Name"
recordTuple = (id, name, price, purchase_date)
cursor.execute(mySql_insert_query, recordTuple)
connection.commit()
print("Record inserted successfully into test table")
except mysql.connector.Error as error:
print("Failed to insert into MySQL table {}".format(error))
**
dataFrame.writeStream \
.format("delta") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.start()
**
Любая помощь по этому вопросу высоко ценится.