Как создать кодировщик для типа Iterator [org.apache.spark.sql.Row] - PullRequest
1 голос
/ 01 ноября 2019

Я использую спарк 2.4.4 в блокноте данных ноутбука. У меня есть данные в фрейме данных, которые я хочу использовать для обновления записей в таблице Postgre. Я придерживаюсь подхода, приведенного в этом посте Spark Dataframes UPSERT to Postgres Table

Вот мой код

import spark.implicits._ 

val update_query = s"""UPDATE scored_fact.f_learner_assessment_item_response_classifications_test SET is_deleted = ? where f.learner_assigned_item_classification_attempt_sk = ?::uuid AND f.root_org_partition= ?::int"""


changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>
  val dbc: Connection = DriverManager.getConnection(connectionUrl)
  val stmt: PreparedStatement = dbc.prepareStatement(update_query)

  batch.grouped(100).foreach { session =>
    session.foreach { row =>
      stmt.setBoolean( 0, row.getAs[Boolean]("is_deleted") )
     stmt.setString( 1, row.getAs[String]("learner_assigned_item_classification_attempt_sk"))
     stmt.setString( 2, row.getAs[String]("root_org_partition"))
     stmt.addBatch()
    }
    stmt.executeBatch()
  }
  dbc.close()
}

Я получаю ошибку ниже

Unable to find encoder for type Iterator[org.apache.spark.sql.Row]. An implicit Encoder[Iterator[org.apache.spark.sql.Row]] is needed to store Iterator[org.apache.spark.sql.Row] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>

Я уверен, что что-то упустил. Как я могу устранить эту ошибку, создав кодировщик

1 Ответ

0 голосов
/ 01 ноября 2019

Подпись mapPartitions равна

def mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U] 

Так что d в d => Iterator(d) - это Iterator[Row], и функция возвращает Dataset[Iterator[Row]], который не может существовать разумным образом.

Я думаю, что вызов mapPartitions просто неправильный, и .mapPartitions((d) => Iterator(d)).foreach следует заменить на foreachPartition (как говорится в комментарии @ shridharama к связанному ответу).

...