Создать и добавить таблицу в блоки данных, используя вложенные данные из json - PullRequest
1 голос
/ 18 февраля 2020

Я генерирую серию данных, вложенных в json, и хотел бы автоматически добавить эти файлы в таблицу внутри блоков данных. У меня нет ее схемы ... эти данные будут go до azure хранилища.

%python
# !/usr/bin/python
# -*- coding: utf-8 -*-

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import sys
import json
import os
import pandas as pd

def create_table():
    qry = """
    CREATE TEMPORARY TABLE vsts 
    USING json
    OPTIONS (path 'dbfs:/mnt/lake/vsts/*.json')
    """
    return spark.sql(qry)

if __name__ == "__main__":
    create_table()

Я попытался создать временную таблицу и затем поместить новые данные в постоянную таблицу, но это процесс не сработал. По сути, мне нужно вставить сотни файлов, которые будут созданы с данными, вложенными в одну таблицу.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import sys
import json
import os

def get_clear_default():
    qry_default = """
    select * from vsts
    """
    return spark.sql(qry_default)

def create_table():
    qry = """
    CREATE TABLE IF NOT EXISTS `db_sandbox`.`tst_vsts` 
    USING JSON
    OPTIONS (
    path 'dbfs:/mnt/lake/vsts/*.json'
    )
    """
    return spark.sql(qry)

if __name__ == "__main__":
    create_table()

1 Ответ

0 голосов
/ 19 февраля 2020

Вероятно, мы можем попробовать этот маршрут.

df=spark.read.option("multiline", "true").json("<Azure_Path>")

df.createOrReplaceTempView("test")

Затем вы можете начать использовать эту 'тестовую' таблицу в наборе Spark SQL для создания таблицы.

Если вы хотите, чтобы разделите массив элементов во вложенном JSON, затем попробуйте POS Explode для денормализации их в таблицу.

        spark.sql("SELECT \
              n.pos AS position, \
              n.<unique_field> AS <unique_field>, \
              <field1>[pos] AS <field1>, \
              <field2>[pos] AS <field2>, \
              <field3>[pos] AS <field3>\
       FROM \
              test \
       LATERAL VIEW POSEXPLODE(<parent_field>.<unique_field>) n AS pos, <unique_field>").show()
...