Есть ли способ получить доступ к нескольким объектам JSON в массиве (структура) по одному в pyspark - PullRequest
0 голосов
/ 04 июня 2019

Я немного новичок в разборе pyspark и json и застрял в каком-то определенном сценарии. Позвольте мне сначала объяснить, что я пытаюсь сделать, у меня есть файл json, в котором есть элемент данных, этот элемент данных является массивом, который содержит два других объекта json. Данный файл json ниже

 {
    "id": "da20d14c.92ba6",
    "type": "Data Transformation Node",
    "name": "",
    "topic": "",
    "x": 380,
    "y": 240,
    "typeofoperation":"join",
    "wires": [
        ["da20d14c.92ba6","da20d14c.93ba6"]
    ],
 "output":true, 
 "data":[
      {
         "metadata_id":"3434",
         "id":"1",
         "first_name":"Brose",
         "last_name":"Eayres",
         "email":"beayres0@archive.org",
         "gender":"Male",
         "postal_code":null
      },
      {
         "metadata_id":"3434",
         "id":"2",
         "first_name":"Brose",
         "last_name":"Eayres",
         "email":"beayres0@archive.org",
         "gender":"Male",
         "postal_code":null
      }
   ]

 }

Теперь то, что я хочу сделать, - это перебирать этот массив данных один за другим: то есть перебирать первый объект json, сохранять его в фрейме данных, а затем перебирать второй объект и сохранять его в другом фрейме данных, а затем делать полный цикл. объединение или любое присоединение к ним (возможно)

Если да, то как это сделать в pyspark. Пока что я сделал
пытался взорвать его, но данные взрываются сразу, а не один

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

sc = SparkContext.getOrCreate()

dataFrame = spark.read.option("multiline", "true").json("nodeWithTwoRddJoin.json")

dataNode = dataFrame.select(explode("data").alias("Data_of_node"))

dataNode.show()

Но приведенный выше код дает мне совокупный набор данных. Чем я пользовался

firstDataSet = dataNode.collect()[0]
secondDataSet =  dataNode.collect()[1] 

Эти строки дают мне строку, которую я не могу объединить обратно в датафрейм Любые предложения и решения

Ответы [ 2 ]

1 голос
/ 04 июня 2019

Вам необходимо применить карту к каждой строке вашего информационного кадра, которая разбивает содержимое одного из его столбцов на два новых столбца. Разделение результатов этого на два кадра данных впоследствии тривиально. Для этого я использовал простую функцию, которая возвращает нужный индекс из массива:

def splitArray(array, pos):
    return array[pos]

Вы можете применить эту функцию следующим образом:

import pyspark.sql.functions as f

mapped = dataFrame.select(
    splitArray(f.col('data'), 0).alias('first'),
    splitArray(f.col('data'), 1).alias('second'))

(Я использовал функцию build in «col» для выбора столбца данных. Не уверен, что есть более элегантный способ добиться этого.)

Результат:

+-----------------------------------------------------+-----------------------------------------------------+
|first                                                |second                                               
|
+-----------------------------------------------------+-----------------------------------------------------+
|[beayres0@archive.org, Brose, Male, 1, Eayres, 3434,]|[beayres0@archive.org, Brose, Male, 2, Eayres, 3434,]|
+-----------------------------------------------------+-----------------------------------------------------+

Чтобы включить столбцы в разные dfs, просто выберите их:

firstDataSet = mapped.select('first')
secondDataSet =  mapped.select('second)
0 голосов
/ 04 июня 2019

Это помещает их в два кадра данных по крайней мере

from pyspark.sql.functions import monotonically_increasing_id

df_with_id = dataNode.withColumn("id",monotonically_increasing_id())

max_id = df_with_id.agg({"id": "max"}).collect()[0]["max(id)"]


first_df = df_with_id.where("id = {maxid}".format(maxid=max_id))
second_df = df_with_id.where("id != {maxid}".format(maxid=max_id))
...