Spark не может записать, а затем прочитать данные в формате JSON с обнуляемым столбцом - PullRequest
0 голосов
/ 27 августа 2018

Я пытаюсь настроить spark для нового проекта, и у меня есть несколько классов дел, сгенерированных из схем в других местах моей компании, которые я хочу использовать в качестве шаблона для чтения / записи в различных форматах (parquet и json)

Я заметил проблему в json с одним из наших полей, которое является Option [String]. Соответствующие данные обычно нулевые, но иногда нет. Когда я тестирую с подмножествами этих данных, есть хороший шанс, что все строки в этом столбце имеют нулевое значение. Кажется, что Spark обнаруживает это и пропускает столбец для любых строк, которые имеют нулевые значения для этих данных.

Когда я читаю, пока любая строка имеет соответствующие данные, spark выбирает схему и может перевести ее обратно в класс case. Но если никого из них там нет, спарк видит пропущенную колонку и терпит неудачу.

Вот код, демонстрирующий это.

import org.apache.spark.sql.SparkSession

object TestNulls {
  case class Test(str: Option[String])
  def main(args: Array[String]) {
    val spark: SparkSession = SparkSession
      .builder()
      .getOrCreate()
    import spark.implicits._

    val dataset = Seq(
      Test(None),
      Test(None),
      Test(None)
    ).toDS()

    // Because all rows are null, writes {} for all rows
    dataset.write.json("testpath")

    // Fails because column `test` does not exist, even though it is an option
    spark.read.json("testpath").as[Test].show()
  }
}

Есть ли способ сказать спарку, чтобы он не потерпел неудачу в пропущенном обнуляемом столбце? В противном случае, есть ли читабельный формат, который я могу использовать, который не будет демонстрировать такое поведение? Json предназначен главным образом для того, чтобы мы могли писать файлы, читаемые человеком, для тестирования и для случаев локальной разработки

1 Ответ

0 голосов
/ 27 августа 2018

Вы можете использовать класс case для извлечения схемы из кодировщика, а затем передать ее при чтении

val schema = implicitly[Encoder[Test]].schema
spark.read.schema(schema).json("testpath")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...