Pyspark: запись данных в Postgres с использованием JDBC - PullRequest
0 голосов
/ 19 июня 2020

1) Я читаю таблицу из Postgres, как показано ниже, и создаю фрейм данных

df = spark.read.format("jdbc").option("url", url). \
                        option("query", "SELECT * FROM test_spark"). \
                            load()

2) Обновляю одно значение в фрейме данных df

newDf = df.withColumn('id',F.when(df['id']==10,20).otherwise(df['id']))

3) I Я пытаюсь вернуть данные в таблицу Postgres.

- Код ниже очищает данные таблицы

newDf.write.mode("overwrite").option("upsert", True).\
        option("condition_columns", "id").option("truncate", True).\
        format("jdbc").option("url", url).option("dbtable", "test_spark").save()

- Код ниже работает нормально.

newDf.write.mode("overwrite").option("upsert", True).\
        option("condition_columns", "id").option("truncate", True).\
        format("jdbc").option("url", url).option("dbtable", "test_spark1").save()

Проблема: Когда я пытаясь записать обновленный фрейм данных обратно в ту же таблицу (т.е. test_spark), данные таблицы очищаются, но когда это новая таблица (т.е. несуществующая таблица), она работает нормально.

1 Ответ

0 голосов
/ 22 июня 2020

Проблема решена путем записи фрейма данных в каталог контрольной точки перед его записью в таблицу БД, как показано в приведенном ниже коде.

     sparkContext.setCheckpointDir('checkpoints')
     newDf.checkpoint().write.format("jdbc").option("url", url).option("truncate", "true").mode("overwrite").\
                option("dbtable", "spark_test").save()
...