Не удается выполнить задание Scala Spark для загрузки данных файла CSV в базу данных mongo из-за исключения CodecConfigurationException - PullRequest
0 голосов
/ 13 июня 2019

Я новичок и в искре, и в скале.Я пытаюсь загрузить CSV-файл в базу данных Mongo, используя искровое задание в Scala.

При загрузке возникает следующая ошибка во время выполнения задания:

org.bson.codecs.configuration.CodecConfigurationException: Не удается найти кодек для класса.

Путь к входному файлу будет передан во время выполнения.

Я как-то застрял с этой проблемой в течение последних 2 дней.Любая помощь в преодолении этой проблемы приветствуется.

Спасибо.

Я пробовал его для загрузки в эластичный поиск, и он работал как шарм.

import org.apache.spark.sql.Row
import com.mongodb.spark._
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
import com.test.Config


object MongoUpload {
    val host = <host>
    val user = <user>
    val pwd = <password>
    val database = <db>
    val collection = <collection>
    val uri = "mongodb://${user}:${pwd}@${host}/"
    val NOW = java.time.LocalDate.now.toString

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Mongo-Test-Upload")
      .config("spark.mongodb.output.uri", uri)
      .getOrCreate()

    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .saveToMongoDB(
        WriteConfig(
            Map(
                "uri" -> uri,
                "database" -> database,
                "collection" -> collection
            )
        )
      )
   }


  def toEligibility(row: Row): Eligibility =
    Eligibility(
      row.getAs[String]("DATE_OF_BIRTH"),
      row.getAs[String]("GENDER"),
      row.getAs[String]("INDIVIDUAL_ID"),
      row.getAs[String]("PRODUCT_NAME"),
      row.getAs[String]("STATE_CODE"),
      row.getAs[String]("ZIPCODE"),
      NOW
    )
}

case class Eligibility (
  dateOfBirth: String,
  gender: String,
  recordId: String,
  ProductIdentifier: String,
  stateCode: String,
  zipCode: String,
  updateDate: String
)

Работа Sparkзавершается со следующей ошибкой, вызванной: org.bson.codecs.configuration.CodecConfigurationException: Не удается найти кодек для класса Eligibility

1 Ответ

0 голосов
/ 13 июня 2019

Вы можете либо сопоставить Document желаемого формата, либо преобразовать в Dataset, а затем сохранить его, например:

    import spark.implicits._
    spark
      .read
      .format("csv")
      .option("header", "true")
      .load(args(0))
      .rdd
      .map(toEligibility)
      .toDS()
      .write()
      .format("com.mongodb.spark.sql.DefaultSource")
      .options(Map("uri" -> uri,"database" -> database, "collection" -> collection)
      .save()
   }
...