Преобразовать набор данных <String>, содержащий Json, в набор данных <StructType> - PullRequest
0 голосов
/ 23 мая 2019

Spark отлично разбирает JSON во вложенный StructType при первоначальном чтении с диска, но что, если у меня уже есть столбец String, содержащий JSON в Dataset, и я хочу отобразить его в Dataset со столбцом StructType, с выводом схемы, который учитывает весь набор данных, полностью используя параллельное состояние и избегая сокращения действий?

Я знаю функции schema_of_json и from_json, которыеочевидно, предназначенный для совместного использования для достижения этой цели, или что-то подобное, но у меня возникают проблемы с нахождением реальных примеров рабочего кода, особенно в Java.

Я приму любой ответ, который предоставляет пример Java и удовлетворяетцели полного вывода схемы и полной нередуцированной параллельной работы.Или, если это невозможно, ближайший обходной путь.

В настоящее время я использую Spark 2.4.0.

Я изучил следующий связанный вопрос:

Неявное обнаружение схемы в столбце Spark DataFrame в формате JSON

Этот вопрос похож на мой, хотя и для Scala.Там нет принятого ответа.ОП объявляет в комментарии, что они нашли «хакерское» решение, чтобы заставить from_schema работать.Проблема с решением, помимо «хакерства», заключается в том, что оно выводит схему только из первой строки кадра данных, поэтому типы могут быть слишком жестко ограничены:

val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first

РЕДАКТИРОВАТЬ: Я пробовал решение, указанное здесь , как описано в комментариях ниже.Вот реализация:

    SparkSession spark = SparkSession
            .builder()
            .appName("example")
            .master("local[*]")
            .getOrCreate();

    Dataset<Row> df = spark.read().text(conf.getSourcePath());

    df.cache();

    String schema = df.select(schema_of_json(col("value")))
          .as(Encoders.STRING())
          .first();
    df.withColumn("parsedJson", from_json(col("value"), schema, new HashMap<String, String>()))
            .drop("value")
            .write()
            .mode("append")
            .parquet(conf.getDestinationPath());

Из этого кода я получил ошибку:

AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [schemaofjson(value#0) AS schemaOfjson(value)#20]
+- Relation[value#0] text

Эта ошибка привела меня к следующему запросу Spark pull: https://github.com/apache/spark/pull/22775

что, по-видимому, указывает на то, что schema_of_json никогда не предназначался для применения ко всей таблице, чтобы сделать вывод схемы для всего объекта, а вместо этого вывести схему из одного литерального образца JSON, переданного непосредственно с использованием lit("some json").В этом случае я не знаю, что Spark предлагает какое-либо решение для полного вывода схемы из JSON для всей таблицы.Если кто-то здесь не может исправить мое прочтение этого запроса на извлечение или предложить альтернативный подход ??

1 Ответ

0 голосов
/ 23 мая 2019

На самом деле это очень простое решение с использованием DataFrameReader.json(Dataset<String>), не знаю, почему оно не появилось в моих поисках:

    Dataset<String> ds = ...;

    spark.read()
        .json(ds)
        .write()
        .mode("append")
        .parquet(conf.getDestinationPath());

Если в исходном наборе данных несколько столбцов, очевидно, вы можете выбрать только тот, с которым будете работать. И тип контента должен быть String (например, не Row).

...