Как определить или перенаправить плохие XML при чтении XML с помощью искры - PullRequest
1 голос
/ 13 июня 2019

Используя spark, я пытаюсь прочитать несколько xmls по пути, один из файлов - фиктивный файл, который не является xml.

Я бы хотел, чтобы искра сказала мне, что один конкретныйфайл недействителен, в любом случае

Добавление "badRecordsPath" otiton записывает неверные данные в указанное место для файлов JSON, но то же самое не работает для xml, есть ли другой способ?

df = (spark.read.format('json')
      .option('badRecordsPath','/tmp/data/failed')
      .load('/tmp/data/dummy.json')

1 Ответ

1 голос
/ 13 июня 2019

Насколько я знаю .... К сожалению, до сегодняшнего дня он не был доступен в xml пакете spark в декларативном виде ... так, как вы ожидаете ...

Json работал, так как FailureSafeParser был реализован, как показано ниже ... в DataFrameReader

/**
   * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
   * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
   *
   * Unless the schema is specified using `schema` function, this function goes through the
   * input once to determine the input schema.
   *
   * @param jsonDataset input Dataset with one JSON object per record
   * @since 2.2.0
   */
  def json(jsonDataset: Dataset[String]): DataFrame = {
    val parsedOptions = new JSONOptions(
      extraOptions.toMap,
      sparkSession.sessionState.conf.sessionLocalTimeZone,
      sparkSession.sessionState.conf.columnNameOfCorruptRecord)

    val schema = userSpecifiedSchema.getOrElse {
      TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
    }

    ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
    val actualSchema =
      StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

    val createParser = CreateJacksonParser.string _
    val parsed = jsonDataset.rdd.mapPartitions { iter =>
      val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
      val parser = new FailureSafeParser[String](
        input => rawParser.parse(input, createParser, UTF8String.fromString),
        parsedOptions.parseMode,
        schema,
        parsedOptions.columnNameOfCorruptRecord)
      iter.flatMap(parser.parse)
    }
    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
  }

вы можете реализовать функцию программным способом.
прочитайте все файлы в папке, используя sc.textFile.файл foreach с помощью парсера xml разбирает записи.

Если допустимо перенаправление на другой путь.

Если он недействителен, введите неправильный путь записи.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...