Обновление таблицы MySQL с помощью pyspark - PullRequest
0 голосов
/ 05 июня 2018

Я знаю, что невозможно просто обновить таблицу MySQL с помощью Spark, но я пытался что-то избежать, и это не работает.

Допустим, у меня есть таблица last_modification, в которой я сохраняюимя пользователя в качестве идентификатора и даты последней модификации системы с различными сервисами.Каждый раз, когда я обрабатываю некоторые данные, я должен обновлять дату, когда данные этого пользователя были изменены, и, если новый пользователь входит в систему, я должен вставить их в таблицу.

Процесс:

  1. Считайте данные из таблицы SQL:

    df_last_mod = sqlContext.read.jdbc(url=url, table="last_modification", properties=properties)
    
  2. Извлеките из этого DataFrame пользователей, которые будут обработаны (last_mod_actual), и оставьте остальные вRDD (last_mod_aux):

    last_mod_actual = (df_last_mod
                   .rdd
                   .filter(lambda x: x[0] == service)
                   )
    
  3. Обновить дату модификации из СДР обработанных пользователей (теперь она называется last_mod_rdd) и присоединить ее к СДРпользователи, которые не были изменены:

    union_rdd = last_mod_rdd.union(last_mod_aux)
    
  4. Эта часть является дополнительной, чтобы не потерять данные, но не знает, можно ли их игнорировать.Здесь я создаю временную таблицу и кеширую ее:

    header = (sqlContext
               .createDataFrame(union_rdd,header_schema)
               .createOrReplaceTempView("union_header")
               )
    sqlContext.cacheTable("union_header")
    
  5. Наконец, я пишу таблицу с использованием JDBC:

    dd = sqlContext.table("union_header")`
    
    dd.write.format('jdbc').options(
           url= url,
           driver="com.mysql.jdbc.Driver",
           dbtable="last_modification",
           user=user,
           password=password).mode('overwrite').save()
    

Этот кодкажется, работает несколько раз, но другие только сохраняют измененных пользователей и стирают неизмененных пользователей.При вставке dd.show() перед записью в таблицу SQL программа, кажется, работает лучше, но на самом деле не знаю, почему, она работает немного случайным образом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...