Загрузка в таблицу RDBMS (Mysql) с использованием Pyspark DataFrames и JDBC - PullRequest
0 голосов
/ 27 февраля 2020

Я пытаюсь выполнить операцию слияния (upsert) на MySql, используя Pyspark DataFrames и соединение JDB C. Следуйте приведенной ниже статье, чтобы сделать то же самое, что в scala. (https://medium.com/@thomaspt748 / как вставить данные в реляционную базу данных с использованием - apache -spark-part-2-45a9d49d0f43 ).

Но мне нужно выполнить операции upsert, используя pyspark, который застрял при переборе Pyspark Dataframe для вызова функции upsert, как показано ниже. Необходимо передать исходный фрейм данных в качестве входных данных для чтения в качестве параметров и выполнить sql upsert. (Проще говоря, выполнение sql upsert с использованием фрейма данных pyspark)

def upsertToDelta(id, name, price, purchase_date):
  try: 
      connection = mysql.connector.connect(host='localhost',
                                             database='Electronics',
                                             user='pynative',
                                             password='pynative@#29')
        cursor = connection.cursor()
        mySql_insert_query = "MERGE INTO targetTable USING
                                VALUES (%s, %s, %s, %s) as INSROW((Id, Name, Price, Purchase_date)
                                WHEN NOT MATCHED THEN INSERT VALUES (INSROW.Id,INSROW.Price,INSROW.Purchase,INSROW.Purchase_date) 
                                WHEN MATCHED THEN UPDATE SET set Name=INSROW.Name"

        recordTuple = (id, name, price, purchase_date)
        cursor.execute(mySql_insert_query, recordTuple)
        connection.commit()
      print("Record inserted successfully into test table")
    except mysql.connector.Error as error:
        print("Failed to insert into MySQL table {}".format(error))     
**
dataFrame.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .start()
**

Любая помощь по этому вопросу высоко ценится.

...