Spark: преобразование файлов JSON в правильный формат - PullRequest
0 голосов
/ 27 апреля 2018

У меня более 100 миллионов записей, хранящихся в файлах со следующей структурой JSON (реальные данные имеют гораздо больше столбцов, строк и также вложены)

{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}

Функция sqlContext.read.json не может разобрать это, поскольку записи находятся не на нескольких строках, а на одной большой строке. Приведенное ниже решение решает эту проблему, но сильно снижает производительность. Как лучше всего с точки зрения производительности решить эту проблему в Apache Spark?

val rdd = sc.wholeTextFiles("s3://some-bucket/**/*")
val validJSON = rdd.flatMap(_._2.replace("}{", "}\n{").split("\n"))

val df = sqlContext.read.json(validJSON)

df.count()
df.select("id").show()

Ответы [ 2 ]

0 голосов
/ 30 апреля 2018

Это риф на ответ Анто , который должен обрабатывать вложенный JSON

input.toVector
    .foldLeft((false, Vector.empty[Char], Vector.empty[String])) {
      case ((true, charAccum, strAccum), '{') => (false, Vector('{'), strAccum :+ charAccum.mkString);
      case ((_, charAccum, strAccum), '}')    => (true, charAccum :+ '}', strAccum);
      case ((_, charAccum, strAccum), char)   => (false, charAccum :+ char, strAccum)
    }
    ._3

По сути, он разбивает данные на Vector[Char] и использует foldLeft для объединения входных данных в подстроки. Хитрость заключается в том, чтобы отслеживать достаточно информации о предыдущем символе, чтобы выяснить, означает ли { начало нового объекта.

Я использовал этот вход для его проверки (в основном образец ввода ОП, с вложенным вложенным объектом):

val input = """{"id":"2-2-3","key":{ "test": "value"}}{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}{"id":"2-2-3","key":"value"}"""

И получил такой результат, который выглядит неплохо:

Vector({"id":"2-2-3","key":{ "test": "value"}}, 
       {"id":"2-2-3","key":"value"}, 
       {"id":"2-2-3","key":"value"}, 
       {"id":"2-2-3","key":"value"})
0 голосов
/ 27 апреля 2018

Проблема с исходным подходом заключается в вызове _._2.replace("}{", "}\n{", который создает еще одну огромную строку из входной, с вставленными символами новой строки, которая затем снова разделяется на массив.

Улучшение возможно за счет минимизации создания промежуточных строк и извлечения целевых строк как можно скорее. Для этого мы можем немного поиграть с подстроками:

val validJson = rdd.flatMap(rawJson => {

  // functions extracted to make it more readable.
  def nextObjectStartIndex(fromIndex: Int):Int = rawJson._2.indexOf('{', fromIndex)
  def currObjectEndIndex(fromIndex: Int): Int = rawJson._2.indexOf('}', fromIndex)
  def extractObject(fromIndex: Int, toIndex: Int): String = rawJson._2.substring(fromIndex, toIndex + 1)

  // the resulting strings are put in a local buffer
  val buffer = new ListBuffer[String]()

  // init the scanning of the input string
  var posStartNextObject = nextObjectStartIndex(0)

  // main loop terminates when there are no more '{' chars
  while (posStartNextObject != -1) {
    val posEndObject = currObjectEndIndex(posStartNextObject)
    val extractedObject = extractObject(posStartNextObject, posEndObject)
    posStartNextObject = nextObjectStartIndex(posEndObject)
    buffer += extractedObject
  }

  buffer
})

Обратите внимание, что этот подход будет работать, только если объекты во входном JSON не являются вложенными, предполагая, что все фигурные скобки разделяют объекты одного уровня.

...