Обработка вложенного json файла в pyspark с разделом - PullRequest
0 голосов
/ 04 августа 2020

У меня есть вложенные данные dict в файле json, хранящемся в hdfs (2 года ежедневных данных), я хотел бы обработать эти данные в pyspark с DATE, разделенными по столбцам. и сохраните его в таблице улья,

Я попытался сначала взорвать гнездо, чтобы получить плоскую структуру, но не понял Как я могу разделить по дате, поскольку он вложен, повторяется и динамически c .. каким должен быть мой подход?

df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
dfRates = df.select(explode(array(df['rates']))).toDF("rates")
dfdate=dfRates.select("rates.2018-02-22.NZD")

# Drop the duplicated rows based on the base and date columns
forex_rates = df.select('Date', 'base', 'rates_BGN', 
'rates_CNY', 'rates_NZD').dropDuplicates(['base', 'Date']).fillna(0, subset= 
['BGN', 'CNY', 'NZD'])

# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")

Заранее спасибо.

sample data:
    
           {'rates': {
                        '2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758}, 
                        '2018-01-09': {'BGN': 1.8558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
                      }, 
            'start_at': '2018-01-01', 
            'base': 'EUR', 
            'end_at': '2018-02-01'
           }
expected df structure:
            
+------------+------+-----------+-----------+-----------+
| Date       | Base | rates_BGN | rates_CNY | rates_NZD |
+------------+------+-----------+-----------+-----------+
| 2018-01-22 | EUR  | 1.9558    | 4.6552    | 7.8374    |
+------------+------+-----------+-----------+-----------+
| 2018-01-09 | EUR  | 1.8558    | 4.4843    | 7.7865    |
+------------+------+-----------+-----------+-----------+
| .......... | ...  | ......    | .....     | ......    |
+------------+------+-----------+-----------+-----------+

1 Ответ

0 голосов
/ 04 августа 2020

Этот код может быть вам полезен, введите JSON принято,

   {'rates': {
                '2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758}, 
                '2018-01-09': {'BGN': 1.9558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
              }, 
    'start_at': '2018-01-01', 
    'base': 'EUR', 
    'end_at': '2018-02-01'
   }

код

from pyspark.sql.functions import *

df=spark.read.option("multiline","true").json("file:///home/sathya/test-datasets/forex_rates.json")

base=df.select("base").rdd.collect()[0].asDict()["base"]
start_at=df.select("start_at").rdd.collect()[0].asDict()["start_at"]
end_at=df.select("end_at").rdd.collect()[0].asDict()["end_at"]

df2=df.select("rates.*")
#python 2
stack_characteristics = str(len(df2.columns))+','+','.join(["'{}',`{}`".format(v,v) for v in df2.columns])

df2.select(expr('''stack({})'''.format(stack_characteristics)).alias('date','vals')).select('date', 'vals.*').withColumn("base",lit(base)).withColumn("start_at",lit(start_at)).withColumn("end_at",lit(end_at)).show()

#python3
#stack_characteristics = str(len(df.columns))+','+','.join([f"'{v}',`{v}`" for v in df.columns])

#df.select(expr(f'''stack({stack_characteristics})''').alias('date','vals')).select('date', 'vals.*').withColumn("base",lit(base)).withColumn("start_at",lit(start_at)).withColumn("end_at",lit(end_at)).show()
'''
+----------+------+------+------+------+------+----+----------+----------+
|      date|   BGN|   CNY|   NOK|   NZD|   TRY|base|  start_at|    end_at|
+----------+------+------+------+------+------+----+----------+----------+
|2018-01-09|1.9558|7.7865|9.6715|1.6601|4.4843| EUR|2018-01-01|2018-02-01|
|2018-01-22|1.9558|7.8374|9.6223|1.6758|4.6552| EUR|2018-01-01|2018-02-01|
+----------+------+------+------+------+------+----+----------+----------+
'''
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...