Я использую спарк 2.4.4 в блокноте данных ноутбука. У меня есть данные в фрейме данных, которые я хочу использовать для обновления записей в таблице Postgre. Я придерживаюсь подхода, приведенного в этом посте Spark Dataframes UPSERT to Postgres Table
Вот мой код
import spark.implicits._
val update_query = s"""UPDATE scored_fact.f_learner_assessment_item_response_classifications_test SET is_deleted = ? where f.learner_assigned_item_classification_attempt_sk = ?::uuid AND f.root_org_partition= ?::int"""
changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection(connectionUrl)
val stmt: PreparedStatement = dbc.prepareStatement(update_query)
batch.grouped(100).foreach { session =>
session.foreach { row =>
stmt.setBoolean( 0, row.getAs[Boolean]("is_deleted") )
stmt.setString( 1, row.getAs[String]("learner_assigned_item_classification_attempt_sk"))
stmt.setString( 2, row.getAs[String]("root_org_partition"))
stmt.addBatch()
}
stmt.executeBatch()
}
dbc.close()
}
Я получаю ошибку ниже
Unable to find encoder for type Iterator[org.apache.spark.sql.Row]. An implicit Encoder[Iterator[org.apache.spark.sql.Row]] is needed to store Iterator[org.apache.spark.sql.Row] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>
Я уверен, что что-то упустил. Как я могу устранить эту ошибку, создав кодировщик