Как ускорить обработку нескольких json с помощью pySpark? - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть список json файлов в Databricks , и я пытаюсь прочитать каждый json , извлечь необходимые значения и затемдобавьте это в пустой кадр данных панд.Каждый файл json соответствует одной строке в конечном кадре данных.Начальная длина списка файлов json составляет 50k .До сих пор я построил функцию ниже, которая отлично справляется со своей задачей, но это занимает так много времени, что заставляет меня поместить список файлов json в 5k бинов и запускать каждый из них отдельно.,Это занимает 30 минут каждый.Я ограничен использованием только 3-узлового кластера в Databricks.

Есть ли шанс, что вы сможете повысить эффективность моей функции?Заранее спасибо.

### Create a big dataframe including all json files ###
def jsons_to_pdf(all_paths):
  # Create an empty pandas dataframes (it is defined only with column names)
  pdf = create_initial_pdf(samplefile)

  # Append each row into the above dataframe
  for path in all_paths:  
    # Create a spark dataframe
    sdf = sqlContext.read.json(path)

    # Create a two extracted lists of values
    init_values = sdf.select("id","logTimestamp","otherTimestamp").rdd.flatMap(lambda x: x).collect()
    id_values = sdf.select(sdf["dataPoints"]["value"]).rdd.flatMap(lambda x: x).collect()[0] 

    #Append the concatenated list each one as a row into the initial dataframe
    pdf.loc[len(pdf)] = init_values + id_values 

  return pdf

Один файл json выглядит следующим образом: enter image description here

И чего я хочу добиться, так это иметь dataPoints ['id'] в качестве новых столбцови dataPoints ['value'] как их значение, чтобы в итоге получилось: enter image description here

1 Ответ

0 голосов
/ 29 ноября 2018

Согласно вашему примеру, вы хотите выполнить поворот, а затем преобразовать ваши данные в pandas dataframe.

Шаги:

  • Соберите всех вас jsons в 1 большой массив данных,
  • Поверните ваши данные,
  • превратите их в кадр данных pandas

Попробуйте что-то вроде этого:

from functools import reduce 


def jsons_to_pdf(all_paths):

    # Create a big dataframe from all the jsons
    sdf = reduce(
        lambda a,b : a.union(b),
        [
            sqlContext.read.json(path)
            for path
            in all_paths
        ]
    )

    # select and pivot your data
    pivot_df = sdf.select(
        "imoNo",
        "logTimestamp",
        "payloadTimestamp",
        F.explode("datapoints").alias("datapoint")
    ).groupBy(
        "imoNo",
        "logTimestamp",
        "payloadTimestamp",
    ).pivot(
        "datapoint.id"
    ).sum("datapoint.value")

    # convert to a pandas dataframe
    pdf = pivot_df.toPandas()

    return pdf

Согласно вашему комментарию, вы можете заменить список файлов all_paths на общий путь и изменить способ создания sdf:

all_paths = 'abc/*/*/*' # 3x*, one for year, one for month, one for day

def jsons_to_pdf(all_paths):

    # Create a big dataframe from all the jsons
    sdf = sqlContext.read.json(path)

Это, несомненно, увеличит производительность.

...