Как преобразовать список объектов JSON в один фрейм данных Pyspark? - PullRequest
1 голос
/ 28 мая 2019

Я новичок в pyspark, у меня есть список jsons из API, каждый объект json имеет одну и ту же схему (пара ключ-значение).Например,

[ {'count': 308,
  'next': 'some_url',
  'previous': None,
  'results': [{'assigned_to': 43,
    'category': 'Unused',
    'comments': None,
    'completed_ts': None,
    'created': '2019-05-27T05:14:22.306843Z',
    'description': 'Pollution',
    'display_name': {'admin': False,
     'business_name': 'Test Business',
     'contact_number': 'some_number',
     'dob': None,
     'email': 'some_mail',
     'emp_id': None,
     'first_name': 'Alisha'}}]},
  {'count': 309,
  'next': 'some_url',
  'previous': None,
  'results': [{'assigned_to': 44,
    'category': 'Unused',
    'comments': None,
    'completed_ts': None,
    'created': '2019-05-27T05:14:22.306843Z',
    'description': 'Pollution',
    'display_name': {'admin': False,
     'business_name': 'Test Business',
     'contact_number': 'some_number',
     'dob': None,
     'email': 'some_mail',
     'emp_id': None,
     'first_name': 'Ali'}}]},......}]

, если бы это были отдельные файлы json.Я бы создал фрейм данных, используя

df =spark.read.json('myfile.json'), а затем слил бы все фреймы данных в один.Я сталкиваюсь с проблемой при преобразовании базы данных непосредственно из самого списка.Я использовал это

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Basics").getOrCreate()
sc= spark.sparkContext
df = pyspark.sql.SQLContext(sc.parallelize(data_list))`

Это дает мне AttributeError: 'RDD' object has no attribute '_jsc'

1 Ответ

1 голос
/ 28 мая 2019

Я не смог найти прямой ответ на вашу проблему.Но это решение работает,

import json
import ast

df = sc.wholeTextFiles(path).map(lambda x:ast.literal_eval(x[1]))\
                            .map(lambda x: json.dumps(x))

df = spark.read.json(df)

Это даст вам вывод как,

+-----+--------+--------+--------------------+
|count|    next|previous|             results|
+-----+--------+--------+--------------------+
|  308|some_url|    null|[[43,Unused,null,...|
|  309|some_url|    null|[[44,Unused,null,...|
+-----+--------+--------+--------------------+

РЕДАКТИРОВАТЬ: Если оно находится в переменной, все, что вам нужно сделать, это

import json

df = sc.parallelize(data).map(lambda x: json.dumps(x))
df = spark.read.json(df)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...