1) Я читаю таблицу из Postgres, как показано ниже, и создаю фрейм данных
df = spark.read.format("jdbc").option("url", url). \
option("query", "SELECT * FROM test_spark"). \
load()
2) Обновляю одно значение в фрейме данных df
newDf = df.withColumn('id',F.when(df['id']==10,20).otherwise(df['id']))
3) I Я пытаюсь вернуть данные в таблицу Postgres.
- Код ниже очищает данные таблицы
newDf.write.mode("overwrite").option("upsert", True).\
option("condition_columns", "id").option("truncate", True).\
format("jdbc").option("url", url).option("dbtable", "test_spark").save()
- Код ниже работает нормально.
newDf.write.mode("overwrite").option("upsert", True).\
option("condition_columns", "id").option("truncate", True).\
format("jdbc").option("url", url).option("dbtable", "test_spark1").save()
Проблема: Когда я пытаясь записать обновленный фрейм данных обратно в ту же таблицу (т.е. test_spark), данные таблицы очищаются, но когда это новая таблица (т.е. несуществующая таблица), она работает нормально.