Как создать вложенный диктонар в фрейме pyspark - PullRequest
1 голос
/ 05 января 2020

Team, мне нужна ваша помощь

Я новичок в Spark и пытаюсь создать вложенную структуру словаря в pyspark ... DataFrames.

Я обработал файл значений CSV и передается в функцию map для создания вложенной словарной структуры. Когда я обрабатываю данные внутри функции карты ... Значения вложенного словаря возвращаются в виде строки. Мне нужно, чтобы вложенный словарь был как dict.

Причина, по которой он конвертируется в String, заключается в том, что ... по умолчанию MapType в Spark обрабатывает Map(StringType, StringType, True)

Пример ввода :

Row(id=207224, id1=11839227, id2=65700, id3=162, TTimeStamp=datetime.datetime(2016, 12, 1, 1, 24, 11), pc=1, DateID_TimeStampUTC=20161201, ModelName=1120007, key=0, key2=5.0, key3=68.0, GbxBrgOilTmpGsAct=69.0, key4=72.0)

def process(row, signals_map, trb_id_u_id):
    signals = {}
    data = {}
    single_payload = {}
    filt_dt = {k: v for k, v in row.asDict().items() if k not in exclude_fields and v is not None}
    log.debug('this is filter data', filt_dt)
    for k, v in filt_dt.items():
        if k not in exclude_filter_fields:
            print('This is key', k)
            k = str(int(signals_map.value.get(k)))
            signals[k] = str(v)
        else:
            k = field_name_map.get(k)
            data[k] = str(v)
    data['signals'] = signals
    data['id'] = trb_id_u_id.value.get(str(data.get('src_trb_id')))
    data['ts_utc'] = derive_tsutc(data.get('ts_utc'))
    single_payload['insrt_ts'] = str(datetime.datetime.now())
    single_payload['data'] = data
    return single_payload


    fnl_data = hist_data.rdd.map(lambda x: process(x,broadcastVar1,broadcastVar2)).toDF()

Текущий вывод

{
    "data" : {
        "signals" : "{Key1:Value1,Key2:Value2,Key3:Value3}",
        "id" : "1234",
        "ts_utc" : "1480555451000",
        "pc" : "1"
    },
    "insrt_ts" : "2020-01-03 12:56:13.808887"
}

Требуемый формат вывода:

{
    "data" : {
        "signals" : {
            "Key1":"Value1",
            "Key2":"Value2",
            "Key3":"Value3"
        },
        "id" : "1234",
        "ts_utc" : "1480555451000",
        "pc" : "1"
    },
    "insrt_ts" : "2020-01-03 12:56:13.808887"
}

Помощь по крайней мере в преобразование этой строки информационного кадра во вложенный dict в pyspark:

**input dafarame :** 
`Row({"Key1":0,"Key2":5.0,"Key3":68.0,"Key4":69.0,"key5":72.0,"ts_utc":1480555451000,"id":207224,"9.0":9.1000003815})`

**required structure:**

{'data':{'signals':{Key1":1,
                    "Key2":2,
                    "Key3":3,
                    "Key4":4,
                    "key5":5}}
                    "ts_utc":1480555451000,
                    "id":207224

                    }

1 Ответ

0 голосов
/ 05 января 2020

Вы можете определить свои собственные схемы Spark, чтобы считывать данные определенным c способом (не позволяя Spark определять типы). (Для получения дополнительной информации дважды проверьте следующую ссылку: https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#programmatically -specifying-the-schema ). В этом случае, чтобы иметь словарь внутри сигналов, вы можете определить MapType (какие ключи и значения имеют StringType)

Ниже вы можете найти возможное решение для показанного ввода данных.

from pyspark.sql.types import StructType, StructField, StringType, MapType

ownSchema = StructType([
    StructField("data", StructType([
      StructField("signals", MapType(StringType(), StringType())),
      StructField("id", StringType()),
      StructField("ts_utc", StringType()),
      StructField("pc", StringType()),
    ])), 
    StructField("insrt_ts", StringType()) 
])

И затем вы можете прочитать данные, используя что-то вроде следующего: spark.createDataFrame(data, schema=ownSchema...)

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...